Lobsterrrr Lobsterrrr - 10 months ago 107
Scala Question

How to write streaming data to S3?

I want to write

to Amazon S3 in Spark Streaming using Scala. These are basically JSON strings. Not sure how to do it more efficiently.
I found this post, in which the library
is used. The idea is to create
and then
. After this the author of the post does something like this:

myDstream.foreachRDD { rdd =>

What are another options besides
? Is it possible to append the file on S3 with the streaming data?

Answer Source

You should take a look into mode method for dataframewriter in Spark Documentation:

public DataFrameWriter mode(SaveMode saveMode)

Specifies the behavior when data or table already exists. Options include: - SaveMode.Overwrite: overwrite the existing data. - SaveMode.Append: append the data. - SaveMode.Ignore: ignore the operation (i.e. no-op). - SaveMode.ErrorIfExists: default option, throw an exception at runtime.

You can try somethling like this with Append savemode.


Spark Append:

Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

Basically you can choose which format you want as an output format by passing "format" keyword to method

public DataFrameWriter format(java.lang.String source)

Specifies the underlying output data source. Built-in options include "parquet", "json", etc.

eg as parquet:


or as json:


Edit: Added details about s3 credentials:

there are two different options how to set credentials and we can see this in SparkHadoopUtil.scala with environment variables System.getenv("AWS_ACCESS_KEY_ID") or with spark.hadoop.foo property:

if (key.startsWith("spark.hadoop.")) {
          hadoopConf.set(key.substring("spark.hadoop.".length), value)

so, you need to get hadoopConfiguration in javaSparkContext.hadoopConfiguration() or scalaSparkContext.hadoopConfiguration and set

hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey)