Aakash282 Aakash282 - 3 months ago 12
Scala Question

[Scala][Play] How can I validate an event with three possible configurations?

I am using Play Framework (Scala) for a micro-service and am using Kafka as the event bus. I have an event consumer that maps to an event class that looks like:

case class MovieEvent[T] (
mediaId: String,
config: T

object MovieEvent {
implicit def movieEventFormat[T: Format]: Format[MovieEvent[T]] =
((__ \ "mediaId").format[String] ~
(__ \ "config").format[T]
)(MovieEvent.apply _, unlift(MovieEvent.unapply))

object MovieProvider extends SerializableEnumeration {
implicit val providerReads: Reads[MovieProvider.Value] = SerializableEnumeration.jsonReader(MovieProvider)
implicit val providerWrites: Writes[MovieProvider.Value] = SerializableEnumeration.jsonWrites
val Dreamworks, Disney, Paramount = Value

The consumer looks like:

class MovieEventConsumer @Inject()(movieService: MovieService
) extends ConsumerRecordProcessor with LazyLogging {
override def process(record: IncomingRecord): Unit = {

val movieEventJson = Json.parse(record.valueString).validate[MovieEvent[DreamworksConfiguration]]
movieEventJson match {
case event: JsSuccess[MovieEvent[DreamworksJobOptions]] => processMovieEvent(event.get)
case er: JsError =>
logger.error("Unrecognized MovieEvent, attempting to parse as MovieUploadEvent: " + JsError.toJson(er).toString())
try {
val data = (Json.parse(record.valueString) \ "upload").as[MovieUploadEvent]
} catch {
case er: Exception => logger.error("Unrecognized kafka event", er)

def processMovieEvent[T](event: MovieEvent[T]): Unit = {
logger.debug(s"Received movie event: ${event}")

def processUploadEvent(event: MovieUploadEvent): Unit = {
logger.debug(s"Received upload event: ${event}")


Right now, I can only validate one of the three different MovieEvent configurations (Dreamwork, Disney, and Paramount). I can swap out which one I validate through the code but thats not the point. However, I would like to validate any of the three without having to make additional consumers. I've tried playing with a few different ideas but none of them compile. I'm pretty new to Play and Kafka and wondering if there is a good way to do this.

Thanks in advance!


I am going to assume that the number of possible configurations is finite and all known in compile time (in your example, 3).

One possibility is to make MovieEvent a sealed trait with a generic type T. Here's a minimal example:

case class DreamWorksJobOptions(anOption: String, anotherOption: String)
case class DisneyJobOptions(anOption: String)

sealed trait MovieEvent[T] {
  def mediaId: String
  def config: T
case class DreamWorksEvent(mediaId: String, config: DreamWorksJobOptions) extends MovieEvent[DreamWorksJobOptions]
case class DisneyEvent(mediaId: String, config: DisneyJobOptions) extends MovieEvent[DisneyJobOptions]

def tryParse(jsonString: String): MovieEvent[_] = {
  // ... parsing logic goes here
  DreamWorksEvent("dw", DreamWorksJobOptions("some option", "another option"))

val parseResult = tryParse("asdfasdf")

parseResult match {
  case DreamWorksEvent(mediaId, config) => println(mediaId + " : " + config.anOption + " : " + config.anotherOption)
  case DisneyEvent(mediaId, config) => println(mediaId + config)

which prints out

dw : some option : another option

I omitted the parsing part because I don't have access to Play Json atm. But since you have a sealed hierarchy, you can try each of your options one by one. (And you pretty much have to, since we cannot guarantee statically that DreamWorksEvent doesn't have the same Json structure as DisneyEvent - you need to decide which gets tried first and fallback to parsing the JSON as another type when the first fails to parse).

Now your other code is very generic. To add a new Event type you just need to add another subclass to MovieEvent, as well as making sure your parsing logic handles that new case. The magic here is that you don't have to specify your T when referring to MovieEvent, since you know you have a sealed hierachy and can thus recover T through pattern matching.