ytasfeb15 ytasfeb15 - 4 months ago 45
Python Question

Exporting text files to PostgreSQL using Spark - Automation

I'm trying export text files to Postgres database using Spark. I am using below piece of code to export individual text files. I have close to 200 text files in the same folder and every text file has same structure. Unfortunately year value is not part of my input file hence I'm hard coding it.

I wish to upload all these files at a time but don't know how to do it , anyone has any suggestions please?

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt")
splits = lines.map(lambda l: l.split(","))
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870)))

schemaBabies = sqlContext.createDataFrame(raw_data)
schemaBabies.registerTempTable("raw_data")

df = sqlContext.sql("select * from raw_data")

pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX"
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"}

df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties)

Answer

Lets assume your data looks like this:

import csv
import tempfile 
import os

out = tempfile.mkdtemp()
data = [
    ("1870", [("Jane Doe", "F", 3)]),
    ("1890", [("John Doe", "M", 1)]),
]

for year, rows in data: 
    with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw:
        csv.writer(fw).writerows(rows)

Start PySpark session or submit script passing correct spark-csv to --packages argument and load data with specified schema:

from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("count", LongType(), True)
])

df = (sqlContext.read.format("com.databricks.spark.csv")
   .schema(schema)
   .load(out))

Extract year from the file name and write:

from pyspark.sql.functions import input_file_name, regexp_extract

df_with_year = (df.withColumn(
    "year",
    regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int")))

df_with_year.show()
## +--------+------+-----+----+
## |    name|gender|count|year|
## +--------+------+-----+----+
## |John Doe|     M|    1|1890|
## |Jane Doe|     F|    3|1870|
## +--------+------+-----+----+

df_with_year.write.jdbc(...)

Important: In Spark < 2.0 this approach depends on not passing data around between Python and JVM. It won't work with Python UDFs or DataFrame.rdd.map.