Swetha Swetha - 1 month ago 8x
Scala Question

org.apache.thrift.transport.TTransportException while updating creating new RDD

I have a data frame with two columns :

. I want to update the value based on another map.

df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value]

Is this correct way of using ?

I tried :

import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo

val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)

The input is list of jsons where each line is json which I am mapping to the POJO class CountryInfo using MappingUtils which takes care of JSON parsing and conversion:

val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]

def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s

val events = sqlContext.sql( "select itemId EventList")

val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val language = countryInfo.getLanguage()

Row(itemId, country, language)

But I keep getting this error :

org.apache.thrift.transport.TTransportException at
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at

org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at

org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at

org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

I am using Spark 1.6


Your question is bit ambiguous.

Don’t collect large RDDs unnecessarily.

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

The way you are doing by collect method is not correct(if it is large DataFrame it may lead to OOM)..

1) To update any column or add new column you can use withColumn

DataFrame   withColumn(java.lang.String colName, Column col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

2) To check the condition based on another datastructure..

you can use when otherwise syntax like below

Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example :

import org.apache.spark.sql.functions._
val sqlcont = new org.apache.spark.sql.SQLContext(sc)
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
      """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""",
      """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""",
      """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}"""

val makeSIfTesla = udf {(make: String) => 
  if(make == "Tesla") "S" else make
df1.withColumn("make", makeSIfTesla(df1("make"))).show

The above can be also achieved like this..

 val rdd = sc.parallelize(
      List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
  val sqlContext = new SQLContext(sc)

  // this is used to implicitly convert an RDD to a DataFrame.
  import sqlContext.implicits._

  val dataframe = rdd.toDF()


 dataframe.map(row => {
    val row1 = row.getAs[String](1)
    val make = if (row1.toLowerCase == "tesla") "S" else row1