Eumcoz Eumcoz - 2 years ago 185
Scala Question

Cannot extend Flink ProcessFunction

I recently upgraded from Flink 1.2 to Flink 1.3, and I am trying to update my

ProcessFunction
to work with 1.3. I have a function I am looking to create which extends the
ProcessFunction
Class, but it is throwing a compile error saying I am not overriding the
processElement
and
onTimer
Here is the code I have:

class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] {
lazy val state : ListState[CountWithTimestamp] = getRuntimeContext
.getListState(new ListStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = {
//Stuff here
}

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
//More Stuff here
}
}


Here are the compile errors I am getting:

Error:(8, 7) class TimeoutStateFunction needs to be abstract, since method processElement in class ProcessFunction of type (x$1: com.fasterxml.jackson.databind.node.ObjectNode, x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context, x$3: org.apache.flink.util.Collector[(String, Long)])Unit is not defined
class TimeoutStateFunction extends ProcessFunction[ObjectNode, (String, Long)] {

Error:(17, 18) method processElement overrides nothing.
Note: the super classes of class TimeoutStateFunction contain the following, non final members named processElement:
def processElement(x$1: com.fasterxml.jackson.databind.node.ObjectNode,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#Context,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit
override def processElement(value: ObjectNode, ctx: Context, out: Collector[(String, Long)]): Unit = {

Error:(36, 16) method onTimer overrides nothing.
Note: the super classes of class TimeoutStateFunction contain the following, non final members named onTimer:
def onTimer(x$1: Long,x$2: org.apache.flink.streaming.api.functions.ProcessFunction[com.fasterxml.jackson.databind.node.ObjectNode,(String, Long)]#OnTimerContext,x$3: org.apache.flink.util.Collector[(String, Long)]): Unit
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {


I am currently using scala 2.11 and Flink 1.3.2

Answer Source

The Context and OnTimerContext depend on the ProcessFunction and its input and output types.

So this should work:

override def processElement(
     value: ObjectNode,
     ctx: ProcessFunction[ObjectNode, (String, Long)]#Context,
     out: Collector[(String, Long)])
   : Unit = {
     //Stuff here
 }

 override def onTimer(
     timestamp: Long,
     ctx: ProcessFunction[ObjectNode, (String, Long)]#OnTimerContext,
     out: Collector[(String, Long)])
   : Unit = {
     //More Stuff here
 }
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download