java—如何在storm中停止元组处理并执行其他代码

z9zf31ra  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(409)

我是风暴的新手。我用它做一个大学项目。
我创建了我的拓扑,用一个连接到mysql数据库的喷口和两个螺栓。第一个螺栓,连接到喷口,准备和删除元组不需要的信息;第二,对元组进行过滤。
我在本地模式下工作。
我的问题是:为什么在运行topology之后,在我的控制台中会看到如下所示的输出?

38211 [Thread-14-movie-SPOUT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
67846 [Thread-10-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67846 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67852 [Thread-10-__acker] INFO  backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@3c270095> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]]
67853 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@38c3d111> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]]
67854 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67855 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6d5c75a9> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]

我读到最后一个元组处理后的这些行被认为是正常的。不是吗?
提交拓扑后,如何运行其他代码?例如,我想打印在第二个bolt中完成的过滤结果,保存在hashmap中。如果我将代码放在包含submittopology()方法的行之后,代码将在元组完成之前运行。
第二个也是最后一个问题是:为什么在每一个风暴的例子中,我都看到了喷口
“线程。睡眠(1000)”?
可能和我的第一个问题有关。
我希望我的问题很清楚。提前谢谢!

xkrw2x1b

xkrw2x1b1#

我读到最后一个元组处理后的这些行被认为是正常的。不是吗?
那些只是 INFO 信息。所以不用担心他们。
如果我将代码放在包含submittopology()方法的行之后,代码将在元组完成之前运行。
如果提交拓扑,拓扑将在后台执行(即多线程)。这是必需的,因为您的拓扑运行“永远”(直到您显式地停止它——或者您的java应用程序终止,因为您运行的是本地模式)。
“拓扑完成后”运行代码与storm的概念不一致,因为strom是一个流系统,并且“处理中没有终点”(输入流是无限的,因此处理将永远运行)。如果您想处理有限的数据集,您可能需要考虑像flink或spark这样的批处理框架。
因此,如果您想在storm中执行此操作,则需要能够确定何时处理了所有数据。因此,在拓扑提交之后,您将在处理完所有数据之后显式地阻塞并等待。
然而,对于您的用例,为什么不在最后一个bolt中打印您的结果呢?
关于 Thread.sleep() 我不知道你指的是什么例子。不知道为什么有人要把它投入生产。可能是为了演示的目的,人为地减慢处理速度。

相关问题