我正在读这本书。我在书中发现了以下代码片段
LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()
因此,此代码将提交拓扑以供执行等待固定的10分钟,然后终止拓扑。但这很奇怪。我怎么说呢。submittopology等待它完成和完成。关闭。
就像在 akka 河一样 Future[Done]
我们只是等待未来的完成(而不是固定的10分钟)。
1条答案
按热度按时间bttbmeg01#
你可以这样做https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/testing.java#l376.
在某些情况下不使用它的原因是它需要拓扑中的每个喷口来实现completablespout接口https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/completablespout.java.
大多数风暴喷口永远不会达到“完成”的程度(因为它是一个流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在使用来自Kafka主题的消息,生产者可能会在任何时候向该主题添加更多消息,那么消费者将如何确定它已完成使用?
完全喷口的存在主要是为了便于测试,因为喷口可以说是否完成了。然后,我链接的completetopology方法可以使用这个额外的特性来判断拓扑中的所有喷口是否都“完成”,并在那之后停止拓扑。
如果您在测试中使用的spout没有实现completablespout(大多数spout没有实现),那么通常无法判断拓扑何时完成。在许多情况下,您仍然可以比链接的示例做得更好,例如,如果我的拓扑应该在测试中向队列写入10条消息,那么我可以在向队列写入10条消息后使测试结束。
关于akka流,我并不十分熟悉,但看看介绍性文档,您可以认为completablespouts类似于有界源(例如
Source(1 to 100)
)而“正常”喷口是无限源(例如Source.repeat(1)
).