public void Execute(Tuple input){
...
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String (config);
String msg = ...
KeyedMessage<String, String> data = new KeyedMessage<String, String>
("transfers", ip, msg);
producer.send(data);
producer.close();
在拓扑b中,初始化拓扑时创建Kafka喷口:
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName,
UUID.randomUUID().toString());
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// Now it's just like any other spout
topologyBuilder.setSpout(kafkaSpout);
3条答案
按热度按时间ds97pgxw1#
虽然不能直接将元组从一个拓扑传递到另一个拓扑,但可以使用ApacheKafka之类的排队系统来完成所描述的任务。风暴Kafka喷口 Package 在他们的最新版本。
gcmastyq2#
设置需要两个storm拓扑(a和b)和一个kafka主题。我们称之为“转移”
在要将数据发送到b拓扑的a拓扑中,使用kafka生产者:
[Kafka初始化代码直接取自文档:https://cwiki.apache.org/confluence/display/kafka/0.8.0+producer+example 显然需要为您的Kafka装置定制。]
在拓扑b中,初始化拓扑时创建Kafka喷口:
当然,这需要运行Kafkahttps://kafka.apache.org/08/quickstart.html).
[编辑:再次阅读您的问题:听起来您有一个可重用组件(save tuple),您想从两个不同的拓扑调用它,而您正试图从另一个拓扑调用它。另一种方法是将此任务转移到第三个拓扑中,专门用于处理保存元组,并且只创建需要在拓扑中持久化的项的kafka消息。这样,保存元组的所有事件都将以相同的方式处理。]
raogr8fs3#
这目前不受支持,您不能将元组从一个拓扑传递到另一个拓扑。基于您的用例,为什么不使用订阅到db bolt的另一个bolt(在同一拓扑中),而不是运行单独的拓扑呢