EY.Mohamed EY.Mohamed - 3 months ago 46
Scala Question

Custom TypeConverters using spark cassandra connector

I wrote an app using the spark cassandra connector . Now , when spark-submit the job i get the error java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox , even though i defined a type converter as specified in https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md , my thoughts are i need a companion object for MailBox where i define a mapper , but i can't find an example for it in the doc. Does anyone know how to solve this ? Thanks

The code:

object Test {
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}

object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = typeTag[Size]
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[Long] {
def targetTypeTag = typeTag[Long]
def convertPF = { case Size(long) => long.toLong }
}
case class MailBox(id: String,totalsize: Size)
case class Id(mailboxid:String)
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = typeTag[Id]
def convertPF = { case str: String => Id(str)
case str: UUID => Id(str.toString) }
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = typeTag[String]
def convertPF = { case Id(str) => str.toString }
}

def main(args: Array[String]) {
val sc = new SparkContext();
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveAsCassandraTable("test","Mailbox")
}
}

Answer

First let me post a quick working example, then I'll walk through what is going wrong

package com.datastax.spark.example

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.spark.connector.types._
import scala.reflect.runtime.universe._
import java.util.UUID

import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock

case class Size(size: Long) {
    if (size < 0) throw new IllegalArgumentException
    def +(s: Size): Size = Size(size + s.size)
}
case class MailBox(id: Id,totalsize: Size)
case class Id(mailboxid:String)



object Test {

    val LongTypeTag = SparkReflectionLock.synchronized {
            implicitly[TypeTag[java.lang.Long]]
    }
    val SizeTypeTag = SparkReflectionLock.synchronized {
            typeTag[Size]
    }
    val IdTypeTag = SparkReflectionLock.synchronized {
            typeTag[Id]
    }
    val StringTypeTag = SparkReflectionLock.synchronized {
            implicitly[TypeTag[String]]
    }

    object LongToSizeConverter extends TypeConverter[Size] {
        def targetTypeTag = SizeTypeTag
        def convertPF = { case long: Long => Size(long)  }
    }
    object LongToSizeConverter extends TypeConverter[Size] {
        def targetTypeTag = SizeTypeTag
        def convertPF = { case long: Long => Size(long)  }
    }

    object SizeToLongConverter extends TypeConverter[java.lang.Long] {
        def targetTypeTag = LongTypeTag
        def convertPF = { case Size(long) => long.toLong }
    }

    object StringToIdConverter extends TypeConverter[Id] {
        def targetTypeTag = IdTypeTag
        def convertPF = { 
            case str: String => Id(str)
            case str: UUID => Id(str.toString)
        }
    }

    object IdToStringConverter extends TypeConverter[String] {
        def targetTypeTag = StringTypeTag
        def convertPF = { case Id(str) => str.toString }
    }

    TypeConverter.registerConverter(StringToIdConverter)
    TypeConverter.registerConverter(IdToStringConverter)
    TypeConverter.registerConverter(LongToSizeConverter)
    TypeConverter.registerConverter(SizeToLongConverter)


    def main(args: Array[String]) {
        val sc = new SparkContext();
        val test = sc.parallelize(Array(MailBox(Id("1"),Size(10))))
        test.saveToCassandra("ks","mailbox")
    }
}

saveAsCassandraTable doesn't work with Custom Types

saveAsCassandraTable uses the fromType method which requires known types (not Custom ones). This is because saveAsCassandraTable creates a Cassandra column based on a known field type. With the a custom type converter you don't explicitly state the (1 to 1) mapping between your type and a Cassandra Column so it can't be looked up. Since saveAsCassandraTable creates the Cassandra table before inserting to it, it gets stuck since it doesn't know how to make the table.

To fix this we change the line

test.saveAsCassandraTable("test","Mailbox")

to

test.saveToCassandraTable("test","Mailbox")

Where we have pre-made the the table in CQLSH but you could also do this using the Java Driver in your application.

We need to convert to Java Types

TypeConverter chaining doesn't work with custom type converters. This means that we need to provide converters from Custom types to Java types. For this I changed the SizeToLong Converter

object SizeToLongConverter extends TypeConverter[java.lang.Long] {

We should defend against Scala Reflection Lack of Thread Safety

I've added synchronized blocks (using the SparkReflectionLock) to make sure we don't end up with any issues there.

See

SparkReflectionLock.synchronized

We need to do the registration at an Object Level

To make sure our registrations happen on the executor JVMs I moved them out of the "main" scope. I'm not sure how important this is but it's best to reflect that this should be happening wherever the code is shipped to and not just during the main method.