storm>如何将java回调集成到喷口中

ghg1uchk  于 2021-06-25  发布在  Storm
关注(0)|答案(2)|浏览(357)

我正在尝试将storm(见这里)集成到我的项目中。我摸索拓扑、喷口和螺栓的概念。但现在,我正试图弄清楚一些事情的实际实现。
a) 我有一个用java和clojure的多语言环境。我的java代码是一个回调类,其中包含触发流数据的方法。推送到这些方法中的事件数据,就是我想用作喷口的内容。
因此,第一个问题是如何将进入这些方法的数据连接到一个喷口?我试图i)传递backtype.storm.topology.irichspout,然后ii)传递backtype.storm.spout.spoutoutOutCollector(请参见此处)到该spout的open函数(请参见此处)。但我看不到一种方法能真正通过任何一种Map或列表。
b) 我的项目剩下的都是clojure。通过这些方法将会有大量的数据。每个事件的id将介于1和100之间。在clojure中,我希望将来自喷口的数据分割成不同的执行线程。我认为,这些将是螺栓。
如何设置clojure螺栓从喷口获取事件数据,然后根据传入事件的id断开线程?
提前谢谢蒂姆
[编辑1]
实际上我已经克服了这个问题。我最终实现了我自己的虹彩嘴。然后我2)将喷口的内部元组连接到java回调类中的传入流数据。我不确定这是否是惯用的。但是它编译和运行没有错误。但是,3)我看不到通过printstuffbolt传入的流数据(肯定在那里)。
为了确保事件数据得到传播,在spout或bolt实现或拓扑定义中是否需要做一些特定的事情?谢谢。

;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })

[编辑2]
在so成员ankur的建议下,我正在重新调整我的拓扑结构。创建java回调之后,我将其元组传递给下面的ibspout,使用 (.setTuple ibspout (.getTuple java-callback)) . 我没有传递整个java回调对象,因为我得到了一个notserializable错误。一切编译和运行都没有错误。不过,我的打印资料里没有数据。六羟甲基三聚氰胺六甲醚。

public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}

      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }
70gysomp

70gysomp1#

对b部分的答复:
对我来说,这个简单的答案听起来像是在寻找一个字段分组,这样就可以控制在执行期间按id分组的工作方式。
也就是说,我不相信这是一个完整的答案,因为我不知道你为什么要这样做。如果您只是想要一个平衡的工作负载,那么无序分组是一个更好的选择。

smtd7mpg

smtd7mpg2#

似乎您正在将这个喷口传递给回调类,这似乎有点奇怪。当一个拓扑被执行时,storm会周期性地调用喷口 nextTuple 方法,因此您需要做的是将java回调传递给您的定制spout实现,以便在storm调用您的spout时,spout调用java回调来获取下一组元组,以便将它们输入到拓扑中。
要理解的关键概念是,当storm请求时,spouts会拉取数据,而不是将数据推送到spouts。您的回调不能调用spout将数据推送到它,相反,当您的spout nextTuple 方法被调用。

相关问题