在apache storm bolt中使用apache camel producertemplate

raogr8fs  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(311)

我想写一个简单的风暴+ Camel 项目。我的storm拓扑分析tweets,一个bolt应该将tweet文本发送到ApacheCamelRoute,ApacheCamelRoute反过来使用websocket通知一些webapp。
我无法使它工作,因为在尝试使用build once上下文时从Bolt接收到notserializableexceptions。
我已经试过了:
在bolt的构造函数中传递camelcontext-导致notserializableexception
在storm conf中传递camelcontext,并在bolt的prepare(…)方法中使用它来访问它。结果:
14484[main]错误org.apache.storm.zookeeper.server.nioservercnxnfactory-线程[main,5,main]java.lang.illegalargumentexception:topology conf在backtype.storm.testing$submit\u local\u topology.invoke(testing)处不可json序列化。clj:262)~[风暴核心-0.9.4。jar:0.9.4]在backtype.storm.localcluster$\u submittopology.invoke(localcluster。clj:43)~[风暴核心-0.9.4。jar:0.9.4]在backtype.storm.localcluster.submittopology(未知来源)~[storm-core-0.9.4。jar:0.9.4]
Camel 路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

storm拓扑:TweetSpout使用twitter4j StremaingAPI传播tweets。

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

腹板螺栓:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有什么好办法吗?
或者我应该使camel路由被http访问,并在bolt prepare(…)方法中创建一些httpclient?这看起来还是有点过分,必须有办法让它更容易。
谢谢你的帮助!

kupeojn6

kupeojn61#

问题的根本原因是您正在将producertemplate添加到storm配置中,而它抛出了一个异常,因为它不可序列化。如果这是您自己的类,您可以更改代码使其工作,但由于这是一个驼峰类,我建议使用不同的方法。
websocketbolt:将producertemplate私有成员更改为临时成员: private transient ProducerTemplate producerTemplate; 这样它就不会试图被序列化(和把它放到conf中有同样的问题)。
websocketbolt:在prepare方法中而不是拓扑中初始化producertemplate。
像这样:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

相关问题