我使用的是python风暴库streamparse(它在下面使用pystorm)。我在样板wordcount项目中调用spouts fail()方法时遇到问题。根据pystorm快速启动文档和我读过的大量内容,在bolt中调用fail(tuple)应该会导致原始喷口中的失败。然而,即使我做了一些修改,当喷口离开喷口时,我总是得到正确的ack()。这是正确的行为,还是需要更改设置/示例变量?
我在streamparse 3.4.0和storm 1.0.2上。
我的日志显示喷口在螺栓记录之前。
7609 [Thread-14-word_spout-executor[2 2]] INFO o.a.s.s.ShellSpout - ShellLog pid:35761, name:word_spout 2017-02-20 15:30:33,070 - pystorm.component.word_spout - acking w
hen I shouldnt tup_id: 3219
...
7611 [Thread-21] INFO o.a.s.t.ShellBolt - ShellLog pid:35760, name:count_bolt 2017-02-20 15:30:33,072 - pystorm.component.count_bolt - BOLT: receiving tup_id/count: 3219
单词.py
from itertools import cycle
from streamparse import Spout
from streamparse import ReliableSpout
class WordSpout(ReliableSpout):
outputs = ['word']
def initialize(self, stormconf, context):
self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
self.count = 0
def next_tuple(self):
self.count += 1
word = next(self.words)
self.emit([str(self.count)], tup_id=self.count)
def ack(self, tup_id):
self.logger.info("acking when I shouldnt tup_id: {0}".format(tup_id))
def fail(self, tup_id):
self.logger.info("failing when I should tup_id: {0}".format(tup_id))
字数.py
import os
from collections import Counter
from streamparse import Bolt
class WordCountBolt(Bolt):
auto_ack = False
auto_fail = False
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.counter = Counter()
self.pid = os.getpid()
self.total = 0
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
# if self.total % 1000 == 0:
# self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
# self.pid))
self.logger.info("BOLT: receiving tup_id/count: {0}".format(word))
# self.emit([word, self.counter[word]])
self.fail(tup)
重申一下,据我所知,我预计螺栓的失败导致失败的喷口,我也不会期待一个确认这么早。我的想法错了吗?
暂无答案!
目前还没有任何答案,快来回答吧!