streamparse连续调用下一个\u元组

z0qdvdin  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(395)

我正在尝试使用streamparse在python中编写一个简单的storm拓扑。一切都在为我工作,除了简单的Kafka喷口,我写-它似乎只是不断呼吁“下一个元组”。我的螺栓是相当慢,所以系统似乎很快爆炸记忆。
启动拓扑时,我尝试将topology.max.spout.pending设置为1,以防止它向拓扑中添加太多消息。

lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1'

然而,结果仍然是这样,尽管螺栓的速度要慢得多:

24790 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......

我简单的Kafka喷口:

class MetadataSpout(Spout):

    def initialize(self, stormconf, context):
        self.log('----CONFIG: %s----' % stormconf)
        k = KafkaClient(os.getenv('KAFKA'))
        self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')

    def next_tuple(self):
        self.log('----NEXT TUPLE----')
        messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
        self.emit([json.dumps([m.message.value for m in messages])])

我的bolt只有默认配置,但是需要花费大量的时间来完成process()方法。我不知道他们怎么会有问题,但如果他们是相关的,我可以发布。

1tuwyuhd

1tuwyuhd1#

解决了,感谢伟大的streamparse团队
“topology.max.spout.pending只有在您的spout可靠时才有效。您需要指定要发出的可选tup\u id参数,以便为每个tuple提供一个唯一的id。完成此操作后,一切都应该正常。”
在为发出的元组指定uuid之后,这个问题就解决了。

相关问题