Fatih Aktepe Fatih Aktepe - 3 months ago 17
Scala Question

Spark Creating DataFrame from a text File

I'm trying to create dataFrame from a text file in spark but it throws error, here is my code;

case class BusinessSchema(business_id: String, name: String, address: String, city: String, postal_code: String, latitude: String, longitude: String, phone_number: String, tax_code: String,
business_certificate: String, application_date: String, owner_name: String, owner_address: String, owner_city: String, owner_state: String, owner_zip: String)

val businessDataFrame = sc.textFile(s"$baseDir/businesses_plus.txt").map(x=>x.split("\t")).map{
case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip)}

val businessRecords = businessDataFrame.toDF()


And the error occur when I run this code;

businessRecords.take(20)


The thrown error code;

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 25, localhost): scala.MatchError: [Ljava.lang.String;@6da1c3f1 (of class [Ljava.lang.String;)

Answer

MatchError means a pattern match failed - none of the cases matched for some input. In this case, you have a single case matching the result of the split("\t") to an Array with exactly 16 elements.

Your data probably has some records that don't follow this assumption (have less or more than 16 tab-separated fields), which will cause this exception.

To overcome this - either replace the use of map with collect(f: PartialFunction[T, U]), which takes a PartialFunction (which may silently ignore inputs that don't match any of the cases), which would simply filter out all erroneous records:

sc.textFile(s"$baseDir/businesses_plus.txt").map(x=>x.split("\t")).collect {
  case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip)
} 

OR - add a case to catch erroneous records and do something with them - for example, you can replace the RDD[BusinessSchema] result with a RDD[Either[BusinessSchema, Array[String]]] to reflect the fact that some records failed to parse, and still have the erroneous data available - for logging or other indication:

val withErrors: RDD[Either[BusinessSchema, Array[String]]] = sc.textFile(s"$baseDir/businesses_plus.txt")
  .map(x=>x.split("\t"))
  .map {
    case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => Left(BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip))
    case badArray => Right(badArray)
  } 

// filter bad records, you can log / count / ignore them
val badRecords: RDD[Array[String]] = withErrors.collect { case Right(a) => a } 

// filter good records - you can go on as planned from here...
val goodRecords: RDD[BusinessSchema] = withErrors.collect { case Left(r) => r }