拓扑到拓扑

nue99wik  于 2021-06-21  发布在  Storm
关注(0)|答案(3)|浏览(286)

从一个拓扑向另一个拓扑发射元组是可能的还是可以的?
假设在一个拓扑中,一个特定的螺栓将元组存储到数据库中。在另一个拓扑中,我不想复制或创建用于存储元组的相同螺栓。那么从第二个拓扑我能发射到第一个拓扑吗?
-哈里普拉萨德

ds97pgxw

ds97pgxw1#

虽然不能直接将元组从一个拓扑传递到另一个拓扑,但可以使用ApacheKafka之类的排队系统来完成所描述的任务。风暴Kafka喷口 Package 在他们的最新版本。

gcmastyq

gcmastyq2#

设置需要两个storm拓扑(a和b)和一个kafka主题。我们称之为“转移”
在要将数据发送到b拓扑的a拓扑中,使用kafka生产者:
[Kafka初始化代码直接取自文档:https://cwiki.apache.org/confluence/display/kafka/0.8.0+producer+example 显然需要为您的Kafka装置定制。]

public void Execute(Tuple input){
...
  Properties props = new Properties();
  props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("partitioner.class", "example.producer.SimplePartitioner");
  props.put("request.required.acks", "1"); 

  ProducerConfig config = new ProducerConfig(props);

  Producer<String, String> producer = new Producer<String, String (config);
  String msg = ... 
  KeyedMessage<String, String> data = new KeyedMessage<String, String>
      ("transfers", ip, msg);
  producer.send(data);
  producer.close();

在拓扑b中,初始化拓扑时创建Kafka喷口:

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, 
    UUID.randomUUID().toString());
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

// Now it's just like any other spout
topologyBuilder.setSpout(kafkaSpout);

当然,这需要运行Kafkahttps://kafka.apache.org/08/quickstart.html).
[编辑:再次阅读您的问题:听起来您有一个可重用组件(save tuple),您想从两个不同的拓扑调用它,而您正试图从另一个拓扑调用它。另一种方法是将此任务转移到第三个拓扑中,专门用于处理保存元组,并且只创建需要在拓扑中持久化的项的kafka消息。这样,保存元组的所有事件都将以相同的方式处理。]

raogr8fs

raogr8fs3#

这目前不受支持,您不能将元组从一个拓扑传递到另一个拓扑。基于您的用例,为什么不使用订阅到db bolt的另一个bolt(在同一拓扑中),而不是运行单独的拓扑呢

相关问题