Max Kuzmentsov Max Kuzmentsov - 11 days ago 6
Scala Question

Apache Spark customize input_file_name() function to leave only the file's name

There is input_file_name function in Apache Spark which is used by me to add new column to Dataset with the name of file which is currently being processed.

The problem is that I'd like to somehow customize this function to return only file name, ommitting the full path to it on s3.

For now, I am doing replacement of the path on the second step using map function:

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}


But I'd like to use something like

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)

Answer
#register udf
spark.udf
  .register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)

#use the udf to get last token(filename) in full path
val initialDs = spark.read
  .option("dateFormat", conf.dateFormat)
  .schema(conf.schema)
  .csv(conf.path).withColumn("input_file_name", get_only_file_name(input_file_name))