Swetha Swetha - 2 months ago 24
Scala Question

org.apache.thrift.transport.TTransportException while updating creating new RDD

I have a data frame with two columns :

id
and
value
. I want to update the value based on another map.

df.collect.foreach({
df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value]
})


Is this correct way of using ?

I tried :

import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo


val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)


The input is list of jsons where each line is json which I am mapping to the POJO class CountryInfo using MappingUtils which takes care of JSON parsing and conversion:

val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]


def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}


val events = sqlContext.sql( "select itemId EventList")

val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val language = countryInfo.getLanguage()

Row(itemId, country, language)
})


But I keep getting this error :

org.apache.thrift.transport.TTransportException at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at

org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at

org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at

org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


I am using Spark 1.6

Answer

Your question is bit ambiguous.

Don’t collect large RDDs unnecessarily.

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

The way you are doing by collect method is not correct(if it is large DataFrame it may lead to OOM)..

1) To update any column or add new column you can use withColumn

DataFrame   withColumn(java.lang.String colName, Column col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

2) To check the condition based on another datastructure..

you can use when otherwise syntax like below

Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example :

import org.apache.spark.sql.functions._
val sqlcont = new org.apache.spark.sql.SQLContext(sc)
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
      """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""",
      """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""",
      """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}"""
    )))

val makeSIfTesla = udf {(make: String) => 
  if(make == "Tesla") "S" else make
}
df1.withColumn("make", makeSIfTesla(df1("make"))).show

The above can be also achieved like this..

 val rdd = sc.parallelize(
      List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt"))
  )
  val sqlContext = new SQLContext(sc)

  // this is used to implicitly convert an RDD to a DataFrame.
  import sqlContext.implicits._

  val dataframe = rdd.toDF()

  dataframe.foreach(println)

 dataframe.map(row => {
    val row1 = row.getAs[String](1)
    val make = if (row1.toLowerCase == "tesla") "S" else row1
    Row(row(0),make,row(2))
  }).collect().foreach(println)

//[2012,S,S]
//[1997,Ford,E350]
//[2015,Chevy,Volt]
Comments