最近,我开始在storm上工作,为了更适应python,我决定使用streamparse来处理storm。我计划在spout中接受一个twitter流,并在bolt中执行一些计算。但我想不出我该如何在喷口中编码。我已经阅读了各种streamparse教程,但它们都显示了从静态列表发出元组的喷口,并且没有twitter流api提供的流。这是我的风暴代码:
class WordSpout(Spout):
def initialize(self, stormconf, context):
self.words = itertools.cycle(['dog', 'cat','zebra', 'elephant'])
def next_tuple(self):
word = next(self.words)
self.emit([word])
这是我给tweepy的密码:
class listener(StreamListener):
def on_status(self,status):
print(status.text)
print "--------------------------------"
return(True)
def on_error(self, status):
print "error"
def on_connect(self):
print "CONNECTED"
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["california"])
我应该如何集成这两个代码?
1条答案
按热度按时间bwleehnv1#
为此,我设置了一个kafka队列,tweepy侦听器使用pykafka将status.text写入队列。然后喷口不断地从队列中读取数据以执行分析。我的代码有点像这样:
侦听器.py:
喷口文件: