jag jag - 1 month ago 28
Scala Question

Flink RMQSource

I want to test RMQSource class for receiving data from RabbitMQ, but i don´t know how to config the Rabbit virtual host for my exchange, and i think is the problem i have. My code:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object rabbitjob {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print


def main (args:Array[String]){
env.execute("Test Rabbit")
}
}


Error in IntelliJ IDE:
Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print


^


Error:(10, 29) not enough arguments for method addSource: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String].
Unspecified value parameter evidence$7.
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print


^


Any idea how to solve it or alternatives??
Thank you in advance.

Answer

The error you're seeing is a Scala compile time error caused by some needed imports not being there. Whenever you are using the Flink Scala API you should include the following:

import org.apache.flink.api.scala._

This will solve the compile time problem you're having.

Comments