Raghav Raghav -4 years ago 98
Scala Question

"not found : value from_json" - spark 2.1.1

It looks like I am missing out on some very basic functionality here, yet its flabbergasting.

This is a very basic piece, i am reading a kafka queue in spark 2.1.1 (kafka 0.10+) and the payload is a json string. I intend to parse the string with a schema and move forward with business logic.

Everyone seems to suggest that i should use from_json to parse the json, however, it doesnt seem to compile for my situation. The error being

not found : value from_json
.select(from_json($"json", txnSchema).as("data"))


When i tried the following lines into spark shell, it works just fine -

val df = stream.selectExpr("cast (value as string) as json")
.select(from_json($"json", txnSchema).as("data"))
.select("data.*")


Any idea, what could i be doing wrong in the code to see this piece working in shell but not in IDE/compile time ?

Here's the code extract-

import org.apache.spark.sql._

object Kafka10Cons3 {
val spark = SparkSession.builder()
.appName(Util.getProperty("AppName"))
.master(Util.getProperty("spark.master"))
.getOrCreate()

val txnSchema = Util.getTxnStructure
def main(args: Array[String]): Unit = {

import spark.implicits._

val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()

import spark.implicits._

val df = stream.selectExpr("cast (value as string) as json")
.select(from_json($"json", txnSchema).as("data"))
.select("data.*")
}
}

Answer Source

you're probably just missing the relevant import - import org.apache.spark.sql.functions._.

You have imported spark.implicits._ and org.apache.spark.sql._, but none of these would import the individual function in functions.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download