Yuri Yuri - 1 year ago 54
Java Question

How to set field types when reading from csv and writing into MS SQL Server?

I have a pretty large

.csv
file and I need to read it, do some modifications and then write into the database. Everything works as expected, everything is fine, except one thing. I want to "help" Apache Spark and do not spend time on inferring datatypes. So I decided to create a
StructType
, please, find the code below.

final StructType structType = new StructType(new StructField[]{
new StructField("Field1", DataTypes.StringType, false, Metadata.empty()),
new StructField("Field2", DataTypes.StringType, false, Metadata.empty()),
new StructField("FieldDate1", DataTypes.StringType, false, Metadata.empty()),
new StructField("Field3", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> dataset = new SQLContext(sparkContext)
.read()
.option("header", true)
.schema(structType)
.csv("\\folder\\20170101.csv");


But the main point here, that if I am not trying to auto-infer types (just comment
.schema(structType)
), then I have all correct types in the
MS SQL database
. For example,
Field1
is
VARCHAR(20)
in the database and after insert, I have the same type.

But after describing scheme, I have
text
everywhere.
I am just curious — are there any way to specify datatypes like
VARCHAR(10)
instead of
text
?

Answer Source

After doing some investigation and finding time to answer, here is what I found out. I've decided to do all experiments in DataBricks.

Schema inferring

Spark tries automatically (by default) to infer the schema. All values that were passed are simply being inferred to StringType. For the first time I decided - okay, it could take more time to infer the schema, fine. But, when I decided to try out Structured Streaming I did not have a way out. The schema needs to be predefined. So, I started to do more experiments.

What is the result?

As Spark tries to infer scheme by default to StringType, I simply did the same thing here. Scala code snippet below:

import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{IntegerType, StringType}

val schemaString = "Field1 Field2 FieldDate1 Field3"
val fields = schemaString.split(" ")
                         .map(fieldName => StructField(fieldName, StringType, nullable=true))

And then, having the schema, we could read csv.

val schema = StructType(fields)
spark.readStream
     .option("header", true)
     .schema(schema)
     .csv("/databricks/path/to/file.csv")

Side note

Take into account that if you need to work with datetime, you could transform data after reading it from csv.

UPDATE #0

Got the answer on Twitter from Jacek Laskowski. Thanks for that :)

Nope in this area. Not only is this about Spark SQL’s JDBCReader but also could be database-specific as there are variants.

UPDATE #1

I decided to go the other way and this is where I am now.

If you have the case when you need to clear a table and insert a new portion of data each day, then don't forget to do these things:

  1. Update Spark to version 2.1. This is the really important thing as there was a pull request regarding this feature. The standard flow is to DROP and then re-create the table.
  2. When writing into database don't forget to add option truncate and set it to true.

    // ...
    .write()
    .mode("overwrite")
    .option("truncate", true)
    // ...
    

This will let Spart not to DROP and then CREATE, but just to TRUNCATE and INSERT new data.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download