Neelam Bhargava Neelam Bhargava - 8 days ago 5
Scala Question

Unable to save dataframe in redshift

I'm reading large dataset form hdfs location and saving my dataframe into redshift.

df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()


After some time i am getting following error

s3.amazonaws.com:443 failed to respond
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:882)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
at org.jets3t.service.StorageService.copyObject(StorageService.java:871)
at org.jets3t.service.StorageService.copyObject(StorageService.java:916)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:323)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:707)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:370)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:


I found the same issue on github

https://github.com/databricks/spark-redshift/issues/266#event-836105447


am i doing something wrong ?
help me plz

Answer

I had the same issue in my case i was using AWS EMR too.

Spark first write the data in Amazon S3 and than this avro files loaded into Redshift.

You have to configure your EMRFS setting and it will be work.

The EMR File System (EMRFS) and the Hadoop Distributed File System (HDFS) are both installed on your EMR cluster. EMRFS is an implementation of HDFS which allows EMR clusters to store data on Amazon S3.

EMRFS will try to verify list consistency for objects tracked in its metadata for a specific number of retries(emrfs-retry-logic). The default is 5. In the case where the number of retries is exceeded the originating job returns a failure. To overcome this issue you can override your default emrfs configuration in the following steps:

Step1: Login your EMR-master instance

Step2: Add following properties to /usr/share/aws/emr/emrfs/conf/emrfs-site.xml

sudo vi /usr/share/aws/emr/emrfs/conf/emrfs-site.xml fs.s3.consistent.throwExceptionOnInconsistency false

<property>
    <name>fs.s3.consistent.retryPolicyType</name>
    <value>fixed</value>
</property>
<property>
    <name>fs.s3.consistent.retryPeriodSeconds</name>
    <value>10</value>
</property>
<property>
    <name>fs.s3.consistent</name>
    <value>false</value>
</property>

And restart your EMR cluster

and also configure your hadoopConfiguration hadoopConf.set("fs.s3a.attempts.maximum", "30")

val hadoopConf = SparkDriver.getContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3a.attempts.maximum", "30")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
Comments