我在centos 6.5中使用hadoop 1.2.1稳定版本,在ApacheFlume1.x中运行flume代理并在hdfs中收集tweets我的flume.conf是
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =******
TwitterAgent.sources.Twitter.consumerSecret =*****
TwitterAgent.sources.Twitter.accessToken =*****
TwitterAgent.sources.Twitter.accessTokenSecret =***
TwitterAgent.sources.Twitter.keywords = CrudeOilPrice,Crude Oil,platts oil, Oil & Gas Journal
TwitterAgent.sources.Twitter.keywords = big data,hadoop
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://master:9000/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
对于运行此命令,我使用了以下命令:
>bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
现在我试着用java运行这个程序,有人能给我一些想法吗
public class fl {
public static void main(String[] args) throws IOException, InterruptedException
{
Process p;
p = Runtime.getRuntime().exec("/home/dsri/flume/bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent");
p.waitFor();
//p.exitValue();
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
while ((line = reader.readLine())!= null)
{
System.out.println(line);
}
}
}
但对我不起作用。。现在我正在用java编写这段代码。。。。
package dsri;
//package org.jai.flume.agent;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.agent.embedded.EmbeddedAgent;
public class FlumeAgentServiceImpl {
private static EmbeddedAgent agent;
private void createAgent() {
final Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();
}
public EmbeddedAgent getFlumeAgent() {
if (agent == null) {
createAgent();
}
return agent;
}
public static void main(String[] args) {
FlumeAgentServiceImpl f= new FlumeAgentServiceImpl();
System.out.println(f.getFlumeAgent());
}
}
但我有个例外。。。
org.apache.flume.FlumeException: NettyAvroRpcClient { host: collector1.apache.org, port: 5564 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:289)
at org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
at org.apache.flume.sink.LoadBalancingSinkProcessor.start(LoadBalancingSinkProcessor.java:134)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.agent.embedded.EmbeddedAgent.doStart(EmbeddedAgent.java:216)
at org.apache.flume.agent.embedded.EmbeddedAgent.start(EmbeddedAgent.java:114)
at dsri.FlumeAgentServiceImpl.createAgent(FlumeAgentServiceImpl.java:48)
at dsri.FlumeAgentServiceImpl.getFlumeAgent(FlumeAgentServiceImpl.java:53)
at dsri.FlumeAgentServiceImpl.main(FlumeAgentServiceImpl.java:61)
Caused by: java.io.IOException: Error connecting to collector1.apache.org/218.93.250.18:5564
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
... 14 more
3条答案
按热度按时间doinxwow1#
您还可以加载flume配置文件,而不是将其写入java代码中。启动独立flume代理时可以使用相同的配置。
其中“agent”是flume代理的名称。
“flume.conf”是应该放在java项目的resources文件夹中的配置文件。
kkbh8khc2#
我被困在同一个问题上。这里不能使用embeddedagent,因为它只支持avro sink。
zqdjd7g93#
与其这样做,不如尝试使用嵌入式代理(一种更优雅、更干净的解决方案)。你创造了一个
Map<String, String>
使用要运行的flume代理的配置,然后创建一个代理并对其进行配置。你可以在这里找到更多的信息。