kodi1911 kodi1911 - 2 months ago 121
Python Question

Spark: fetch data from complex dataframe schema with map

I've got a following structure

json.select($"comments").printSchema

root
|-- comments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- comment: struct (nullable = true)
| | | |-- date: string (nullable = true)
| | | |-- score: string (nullable = true)
| | | |-- shouts: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- tags: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- text: string (nullable = true)
| | | |-- username: string (nullable = true)
| | |-- subcomments: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- date: string (nullable = true)
| | | | |-- score: string (nullable = true)
| | | | |-- shouts: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- tags: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- text: string (nullable = true)
| | | | |-- username: string (nullable = true)


I would like to get an array/list [username, score, text] of comment. Normally, in pyspark I would do something like this

comments = json
.select("comments")
.flatMap(lambda element:
map(lambda comment:
Row(username = comment.username,
score = comment.score,
text = comment.text),
element[0])
.toDF()


But, when I try the same approach in scala

json.select($"comments").rdd.map{row: Row => row(0)}.take(3)


I have some weird output

Array[Any] =
Array(
WrappedArray([[stirng,string,WrappedArray(),WrappedArray(),,string] ...], ...)


Is there any way to perform that task in scala as easy as it's done with python?

Also, how to iterate WrappedArray like an Array/List, I'm having an error like this

rror: scala.collection.mutable.WrappedArray.type does not take parameters

Answer

How about using statically typed Dataset instead?

case class Comment(
    date: String, score: String,
    shouts: Seq[String], tags: Seq[String],
    text: String, username: String
)

df
  .select(explode($"comments.comment").alias("comment"))
  .select("comment.*")
  .as[Comment]
  .map(c => (c.username, c.score, c.date))

which can be further simplified if you don't depend on REPL:

df
  .select("comments.comment")
  .as[Seq[Comment]]
  .flatMap(_.map(c => (c.username, c.score, c.text)))

If you really want to deal with Rows use typed getters:

df.rdd.flatMap(
  _.getAs[SR]("comments")
    .map(_.getAs[Row]("comment"))
    .map {
      // You could also _.getAs[String]("score") or getString(0)
      case Row(_, score: String, _, _, text: String, username: String) => 
        (username, score, text)
    }
)