Pravinkumar Hadpad Pravinkumar Hadpad - 1 year ago 117
Scala Question

How to create DataFrame from fixed-length text file given field lengths?

I am reading fixed positional file. Final result of file is stored in string. I would like to convert string into a

DataFrame
to process further. Kindly help me on this. Below is my code:

Input data:
+---------+----------------------+

|PRGREFNBR|value |

+---------+----------------------+

|01 |11 apple TRUE 0.56|

|02 |12 pear FALSE1.34|

|03 |13 raspberry TRUE 2.43|

|04 |14 plum TRUE .31|

|05 |15 cherry TRUE 1.4 |

+---------+----------------------+

data position:
"3,10,5,4"


expected result with default header in data frame:

+-----+-----+----------+-----+-----+

|SeqNo|col_0| col_1|col_2|col_3|

+-----+-----+----------+-----+-----+

| 01 | 11 |apple |TRUE | 0.56|

| 02 | 12 |pear |FALSE| 1.34|

| 03 | 13 |raspberry |TRUE | 2.43|

| 04 | 14 |plum |TRUE | 1.31|

| 05 | 15 |cherry |TRUE | 1.4 |

+-----+-----+----------+-----+-----+

Answer Source

Given the fixed-position file (say input.txt):

11 apple     TRUE 0.56

12 pear      FALSE1.34 

13 raspberry TRUE 2.43 

14 plum      TRUE 1.31 

15 cherry    TRUE 1.4 

and the length of every field in the input file as (say lengths):

3,10,5,4

you could create a DataFrame as follows:

// Read the text file as is
// and filter out empty lines
val lines = spark.read.textFile("input.txt").filter(!_.isEmpty)

// define a helper function to do the split per fixed lengths
// Home exercise: should be part of a case class that describes the schema
def parseLinePerFixedLengths(line: String, lengths: Seq[Int]): Seq[String] = {
  lengths.indices.foldLeft((line, Array.empty[String])) { case ((rem, fields), idx) =>
    val len = lengths(idx)
    val fld = rem.take(len)
    (rem.drop(len), fields :+ fld)
  }._2
}

// Split the lines using parseLinePerFixedLengths method
val lengths = Seq(3,10,5,4)
val fields = lines.
  map(parseLinePerFixedLengths(_, lengths)).
  withColumnRenamed("value", "fields") // <-- it'd be unnecessary if a case class were used
scala> fields.show(truncate = false)
+------------------------------+
|fields                        |
+------------------------------+
|[11 , apple     , TRUE , 0.56]|
|[12 , pear      , FALSE, 1.34]|
|[13 , raspberry , TRUE , 2.43]|
|[14 , plum      , TRUE , 1.31]|
|[15 , cherry    , TRUE , 1.4 ]|
+------------------------------+

That's what you may have had already so let's unroll/destructure the nested sequence of fields into columns

val answer = lengths.indices.foldLeft(fields) { case (result, idx) =>
  result.withColumn(s"col_$idx", $"fields".getItem(idx))
}
// drop the unnecessary/interim column
scala> answer.drop("fields").show
+-----+----------+-----+-----+
|col_0|     col_1|col_2|col_3|
+-----+----------+-----+-----+
|  11 |apple     |TRUE | 0.56|
|  12 |pear      |FALSE| 1.34|
|  13 |raspberry |TRUE | 2.43|
|  14 |plum      |TRUE | 1.31|
|  15 |cherry    |TRUE | 1.4 |
+-----+----------+-----+-----+

Done!

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download