houcros houcros - 1 year ago 87
Scala Question

Flink error: Specifying keys via field positions is only valid for tuple data types

I am using the Scala API of Flink. I have some transformations over a

reports = DataStream[Tuple15]
(the
Tuple15
is a Scala Tuple and all the fields are
Int
). The issue is located here:

reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})


The error states:

InvalidProgramException: Specifying keys via field positions is only valid for
tuple data types. Type: GenericType<scala.Tuple5>


Whole error trace (I marked the line pointing to the error, line 107, corresponding to the
apply
method on the code above):

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)


But this doesn't make sense to me. I am using a tuple type, am I not? Or what is the deal with the
GenericType<...>
?

And how should I fix the
map
to make the
keyBy
work?

Answer Source

The reason is that the TypeInformation belongs to the Java API and, thus, does not know the Scala tuples. Therefore, it returns a GenericType which cannot be used as the input for a keyBy operation with field positions.

If you want to generate the Scala tuple type information manually, you have to uses the createTypeInformation method which is contained in the org.apache.flink.api.scala/org.apache.flink.streaming.api.scala package object.

But if you import the package object, then there is no need to specify the type information manually, since the TypeInformation is a context bound of the map operation and createTypeInformation is an implicit function.

The following code snippet shows the idiomatic way to deal with TypeInformations.

import org.apache.flink.streaming.api.scala._

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download