kaks kaks - 3 months ago 16
Python Question

How to output spark data to a csv file with separate columns?

My code 1st extracts data using a regex and writes that data to a text file (string format).
I then tried creating a dataframe out of the contents in the text file so that i can have separate columns which led to an error. (Writing it to a csv file writes the entire thing into just one column).

with open("C:\\Sample logs\\dataframe.txt",'a') as f:
f.write(str(time))
f.write(" ")
f.write(qtype)
f.write(" ")
f.write(rtype)
f.write(" ")
f.write(domain)
f.write("\n")
new = sc.textFile("C:\\Sample logs\\dataframe.txt").cache() # cause df requires an rdd
lines1 = new.map(lambda x: (x, ))
df = sqlContext.createDataFrame(lines1)


But i get the following error:


TypeError: Can not infer schema for type: type 'unicode'


I tried some other ways but didn't help. All that I want to do is that after performing write operation, i want to create a dataframe that has separate columns in order to use groupBy().

The input in the text file:

1472128348.0 HTTP - tr.vwt.gsf.asfh
1472237494.63 HTTP - tr.sdf.sff.sdfg
1473297794.26 HTTP - tr.asfr.gdfg.sdf
1474589345.0 HTTP - tr.sdgf.gdfg.gdfg
1472038475.0 HTTP - tr.sdf.csgn.sdf


Expected output in csv format:


The same thing as above but separated into columns so i can perform
groupby operations.

Answer

In order to replace "space separated words" into a list of words you'll need to replace:

lines1 = new.map(lambda x: (x, ))

with

 lines1 = new.map(lambda line: line.split(' '))

I tried it on my machine, and after executing the following

df = sqlContext.createDataFrame(lines1)

A new DF was created:

df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)

df.show()
+-------------+----+---+-----------------+
|           _1|  _2| _3|               _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

You can execute groupBy:

>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>> 

In order to use schema, you'll need first to define it: see: https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

A schema sample can be found below (you'll need to add fields, and update names, type in order to adopt it to your case)

from pyspark.sql.types import *
schema = StructType([
    StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True),
    StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

Afterwards you'll be able to run it with schema:

df = sqlContext.createDataFrame(lines1,schema)

And now, you'll have names for the fields:

df.show()
+-------------+----+---+-----------------+
|           F1|  F2| F3|               F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

in order to save it to CSV, you'll need to use "to_pandas()" , and "to_csv()" (part of python pandas)

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html

df.toPandas().to_csv('mycsv.csv')

the content of the csv file:

cat mycsv.csv

,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf

Note that you can cast a column using ".cast()", e.g. casting F1 to be of type float - adding a new column with type float, and dropping the old column)

df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
Comments