Brady Auen Brady Auen - 2 months ago 53
Scala Question

Spark and Amazon EMR: S3 Connections not being closed

My application loops through the lines of a text file containing S3 directories, reads them in, performs ETL processes, and then writes back out to S3, it's failed in the same place several times (after about 80 loops) so I'm thinking that Spark's not closing my S3 connections and my allocated connection pool is getting exhausted. The error is:

16/09/14 19:30:49 INFO AmazonHttpClient: Unable to execute HTTP request: Timeout waiting for connection from pool
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.getConnection(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313)
at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:324)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:452)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:458)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:451)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:116)
at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:105)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.webpt.SparkMaxwell$.main(SparkMaxwell.scala:105)
at com.webpt.SparkMaxwell.main(SparkMaxwell.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Relevant code:

val primaryKeyFile = sparkSession.sparkContext.textFile(keys)

for(line <- primaryKeyFile.collect()){

//keys text file is of syntax tableName (tab) key,key,key
val (tableName, tableKeys) = (line.split("\t")(0).toLowerCase(), line.split("\t")(1).toLowerCase.split(",").toSeq)
//if the table exists in master
if (FileSystem.get(new URI(s"$lake/$tableName/$yesterday"), sparkSession.sparkContext.hadoopConfiguration)
.exists(new Path(s"$lake/$tableName/$yesterday"))) {
if (!distinctTables.contains(tableName)) {
//if exists only in master
//read in the master data from yesterday and copy it to today after changing columns to lower case
val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500)
masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")
}
else {
//if exists both in master and input
//filter out all unrelated data out of the testFreeDF
val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName))
//Replace the encrypted data column with a decrypted data column, then turn each row into strings so sqlContext.read.json can read the json string into a DF
val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString()))
//lowercase the column names by selecting the column names in lower case
val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*)

//read in masterDF and create list of columns in lower case so we can check for schema changes

val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500)
val lowerCaseDFColumns = lowerCaseDF.columns
val lowerCaseMasterDFColumns = masterDF.columns.map(column => column.toLowerCase)

//if the columns are the same..
if (lowerCaseDFColumns.toSet == lowerCaseMasterDFColumns.toSet) {
//union the two tables adding a new old/new column, input data with value of 2, master data with value of 1
val finalFrame: DataFrame = lowerCaseDF.withColumn("old/new", lit("2")).union(masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).withColumn("old/new", lit("1"))).repartition(1500)

masterDF.unpersist()
lowerCaseDF.unpersist()

val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new"))
//call window function, partitions by the primary keys and orders by old/new, for each partition the highest old/new is kept, meaning duplicates in master are dropped
finalFrame.withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1")
.drop("rownum").drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today")

finalFrame.unpersist()

}
//if we have different columns then we need to adjust, in the end we want to keep all columns
else {

//create select statements for each dataframe that maintains existing columns, and adds null columns for the columns they don't have
val masterExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
//if the field already exists in master schema, we add the name to our select statement
if (lowerCaseMasterDFColumns.contains(field)) {
col(field.toLowerCase)
}
//else, we hardcode a null column in for that name
else {
lit(null).alias(field.toLowerCase)
}
)

//same thing for input, maybe we could get this down to one map()
val inputExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
//if the field already exists in master schema, we add the name to our select statement
if (lowerCaseDFColumns.contains(field)) {
col(field.toLowerCase)
}
//else, we hardcode a null column in for that name
else {
lit(null).alias(field.toLowerCase)
}
)

val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new"))

//same process, we just use our select statements beforehand
masterDF.select(masterExprs: _*).withColumn("old/new", lit("2"))
.union(lowerCaseDF.select(inputExprs: _*).withColumn("old/new", lit("1"))).repartition(1500)
.withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1").drop("rownum")
.drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today")

masterDF.unpersist()
lowerCaseDF.unpersist()
}

}
}
else {
if (distinctTables.contains(tableName)) {
//if the input doesn't exist in master, we filter out unrelated data, decrypt it, set the columns to be lower case, and then write it to master
val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName))
val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString()))
val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*)
lowerCaseDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")
}
}
}


The line that it's failing on is:

masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")


So what can I do? Is there any way to ensure that Spark is closing connections after reads/writes to S3?

Answer

This is caused by a bug in spark-avro, which should be fixed in the newly-published 3.0.1 release.