如何从java代码启动flume代理

xv8emn3q  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(450)

我在centos 6.5中使用hadoop 1.2.1稳定版本,在ApacheFlume1.x中运行flume代理并在hdfs中收集tweets我的flume.conf是

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =******
TwitterAgent.sources.Twitter.consumerSecret =*****
TwitterAgent.sources.Twitter.accessToken =*****
TwitterAgent.sources.Twitter.accessTokenSecret =***
TwitterAgent.sources.Twitter.keywords = CrudeOilPrice,Crude Oil,platts oil, Oil & Gas Journal 
TwitterAgent.sources.Twitter.keywords = big data,hadoop
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://master:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

对于运行此命令,我使用了以下命令:

>bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent

现在我试着用java运行这个程序,有人能给我一些想法吗

public class fl {

    public static void main(String[] args) throws IOException, InterruptedException
    {
        Process p;

        p = Runtime.getRuntime().exec("/home/dsri/flume/bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent");

        p.waitFor();
         //p.exitValue();

        BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));

        String line = "";

        while ((line = reader.readLine())!= null) 
        {
        System.out.println(line);
        }
    }
}

但对我不起作用。。现在我正在用java编写这段代码。。。。

package dsri;
//package org.jai.flume.agent;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent;

public class FlumeAgentServiceImpl {

    private static EmbeddedAgent agent;
    private void createAgent() {
        final Map<String, String> properties = new HashMap<String, String>();

        properties.put("channel.type", "memory");
        properties.put("channel.capacity", "200");
        properties.put("sinks", "sink1 sink2");
        properties.put("sink1.type", "avro");
        properties.put("sink2.type", "avro");
        properties.put("sink1.hostname", "collector1.apache.org");
        properties.put("sink1.port", "5564");
        properties.put("sink2.hostname", "collector2.apache.org");
        properties.put("sink2.port",  "5565");
        properties.put("processor.type", "load_balance");

        agent = new EmbeddedAgent("myagent");
        agent.configure(properties);
        agent.start();

    }

    public EmbeddedAgent getFlumeAgent() {
        if (agent == null) {
            createAgent();
        }
        return agent;
    }

    public static void main(String[] args) {
        FlumeAgentServiceImpl f= new FlumeAgentServiceImpl();
        System.out.println(f.getFlumeAgent());
    }

}

但我有个例外。。。

org.apache.flume.FlumeException: NettyAvroRpcClient { host: collector1.apache.org, port: 5564 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
at org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
at org.apache.flume.sink.LoadBalancingSinkProcessor.start(LoadBalancingSinkProcessor.java:134)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.agent.embedded.EmbeddedAgent.doStart(EmbeddedAgent.java:216)
at org.apache.flume.agent.embedded.EmbeddedAgent.start(EmbeddedAgent.java:114)
at dsri.FlumeAgentServiceImpl.createAgent(FlumeAgentServiceImpl.java:48)
at dsri.FlumeAgentServiceImpl.getFlumeAgent(FlumeAgentServiceImpl.java:53)
at dsri.FlumeAgentServiceImpl.main(FlumeAgentServiceImpl.java:61)
Caused by: java.io.IOException: Error connecting to collector1.apache.org/218.93.250.18:5564
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
... 14 more
doinxwow

doinxwow1#

您还可以加载flume配置文件,而不是将其写入java代码中。启动独立flume代理时可以使用相同的配置。

public static void main(String[] args)
{
    String[] args = new String[] { "agent", "-nAgent",
            "-fflume.conf" };

    Application.main(args);

}

其中“agent”是flume代理的名称。
“flume.conf”是应该放在java项目的resources文件夹中的配置文件。

kkbh8khc

kkbh8khc2#

我被困在同一个问题上。这里不能使用embeddedagent,因为它只支持avro sink。

zqdjd7g9

zqdjd7g93#

与其这样做,不如尝试使用嵌入式代理(一种更优雅、更干净的解决方案)。你创造了一个 Map<String, String> 使用要运行的flume代理的配置,然后创建一个代理并对其进行配置。

Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port",  "5565");
properties.put("processor.type", "load_balance");

EmbeddedAgent agent = new EmbeddedAgent("myagent");

agent.configure(properties);
agent.start();

List<Event> events = Lists.newArrayList();

events.add(event);
events.add(event);
events.add(event);
events.add(event);

agent.putAll(events);

...

agent.stop();

你可以在这里找到更多的信息。

相关问题