Gaurav Dhama Gaurav Dhama - 4 months ago 61
Python Question

How to load data in chunks from a pandas dataframe to a spark dataframe

I have read data in chunks over a pyodbc connection using something like this :

import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)


Now I want to read all these chunks into one single spark dataframe using something like:

i = 0
for chunk in df1:
if i==0:
df2 = sqlContext.createDataFrame(chunk)
else:
df2.unionAll(sqlContext.createDataFrame(chunk))
i = i+1


The problem is when i do a
df2.count()
i get the result as 10 which means only the i=0 case is working.Is this a bug with unionAll. Am i doing something wrong here??

Answer

The documentation for .unionAll() states that it returns a new dataframe so you'd have to assign back to the df2 DataFrame:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

Furthermore you can instead use enumerate() to avoid having to manage the i variable yourself:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))

Furthermore the documentation for .unionAll() states that .unionAll() is deprecated and now you should use .union() which acts like UNION ALL in SQL:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.union(sqlContext.createDataFrame(chunk))

Edit:
Furthermore I'll stop saying furthermore but not before I say furthermore: As @zero323 says let's not use .union() in a loop. Let's instead do something like:

from pyspark.sql import DataFrame

df_list = []
for chunk in df1:
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = DataFrame.union(*df_list)
Comments