Uttam Raj Uttam Raj - 1 month ago 16
Java Question

How to start Flume agent from java code

I am using hadoop 1.2.1 stable version in centos 6.5 and using apache flume 1.x i am running the flume agent and collecting the tweets in hdfs my flume.conf is

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = ******
TwitterAgent.sources.Twitter.consumerSecret =*****
TwitterAgent.sources.Twitter.accessToken = *****
TwitterAgent.sources.Twitter.accessTokenSecret = ***
TwitterAgent.sources.Twitter.keywords = CrudeOilPrice,Crude Oil,platts oil, Oil & Gas Journal
TwitterAgent.sources.Twitter.keywords = big data,hadoop
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://master:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100


for run this i used command:

>bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent


now i am trying to run this with java program can any one give some idea i tried this code

public class fl {

public static void main(String[] args) throws IOException, InterruptedException
{
Process p;

p = Runtime.getRuntime().exec("/home/dsri/flume/bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent");

p.waitFor();
//p.exitValue();

BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));

String line = "";

while ((line = reader.readLine())!= null)
{
System.out.println(line);
}
}
}


but is not working for me..
now i am doing this code in java....

package dsri;
//package org.jai.flume.agent;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent;


public class FlumeAgentServiceImpl {

private static EmbeddedAgent agent;
private void createAgent() {
final Map<String, String> properties = new HashMap<String, String>();

properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");

agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();

}

public EmbeddedAgent getFlumeAgent() {
if (agent == null) {
createAgent();
}
return agent;
}

public static void main(String[] args) {
FlumeAgentServiceImpl f= new FlumeAgentServiceImpl();
System.out.println(f.getFlumeAgent());
}

}


But I am getting an exception...

org.apache.flume.FlumeException: NettyAvroRpcClient { host: collector1.apache.org, port: 5564 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
at org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
at org.apache.flume.sink.LoadBalancingSinkProcessor.start(LoadBalancingSinkProcessor.java:134)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.agent.embedded.EmbeddedAgent.doStart(EmbeddedAgent.java:216)
at org.apache.flume.agent.embedded.EmbeddedAgent.start(EmbeddedAgent.java:114)
at dsri.FlumeAgentServiceImpl.createAgent(FlumeAgentServiceImpl.java:48)
at dsri.FlumeAgentServiceImpl.getFlumeAgent(FlumeAgentServiceImpl.java:53)
at dsri.FlumeAgentServiceImpl.main(FlumeAgentServiceImpl.java:61)
Caused by: java.io.IOException: Error connecting to collector1.apache.org/218.93.250.18:5564
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
... 14 more

Answer

You could also load the Flume Configuration file rather than writing it within the Java code.The same configuration could be used while starting your standalone Flume Agent.

public static void main(String[] args)
{
    String[] args = new String[] { "agent", "-nAgent",
            "-fflume.conf" };

    Application.main(args);

}

Where "Agent" is the name of your Flume Agent.
"flume.conf" is the configuration file which should be placed in the resources folder of your Java project.