我想写一个简单的风暴+ Camel 项目。我的storm拓扑分析tweets,一个bolt应该将tweet文本发送到ApacheCamelRoute,ApacheCamelRoute反过来使用websocket通知一些webapp。
我无法使它工作,因为在尝试使用build once上下文时从Bolt接收到notserializableexceptions。
我已经试过了:
在bolt的构造函数中传递camelcontext-导致notserializableexception
在storm conf中传递camelcontext,并在bolt的prepare(…)方法中使用它来访问它。结果:
14484[main]错误org.apache.storm.zookeeper.server.nioservercnxnfactory-线程[main,5,main]java.lang.illegalargumentexception:topology conf在backtype.storm.testing$submit\u local\u topology.invoke(testing)处不可json序列化。clj:262)~[风暴核心-0.9.4。jar:0.9.4]在backtype.storm.localcluster$\u submittopology.invoke(localcluster。clj:43)~[风暴核心-0.9.4。jar:0.9.4]在backtype.storm.localcluster.submittopology(未知来源)~[storm-core-0.9.4。jar:0.9.4]
Camel 路线:
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:main")
.to("websocket:localhost:8085/main?sendToAll=true");
}
}
storm拓扑:TweetSpout使用twitter4j StremaingAPI传播tweets。
public class TwitterStreamTopology {
public static void main(String[] args) {
CamelContext producerTemplate = new RouteStarter().buildRoute();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
Config conf = new Config();
conf.put("producerTemplate", producerTemplate.createProducerTemplate());
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(20000);
cluster.shutdown();
}
}
腹板螺栓:
public class WebSocketBolt extends BaseBasicBolt {
private ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
}
}
有什么好办法吗?
或者我应该使camel路由被http访问,并在bolt prepare(…)方法中创建一些httpclient?这看起来还是有点过分,必须有办法让它更容易。
谢谢你的帮助!
1条答案
按热度按时间kupeojn61#
问题的根本原因是您正在将producertemplate添加到storm配置中,而它抛出了一个异常,因为它不可序列化。如果这是您自己的类,您可以更改代码使其工作,但由于这是一个驼峰类,我建议使用不同的方法。
websocketbolt:将producertemplate私有成员更改为临时成员:
private transient ProducerTemplate producerTemplate;
这样它就不会试图被序列化(和把它放到conf中有同样的问题)。websocketbolt:在prepare方法中而不是拓扑中初始化producertemplate。
像这样: