如何在streamparse spout中使用tweepy接受twitter流并将tweets传递给bolt?

z31licg0  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(389)

最近,我开始在storm上工作,为了更适应python,我决定使用streamparse来处理storm。我计划在spout中接受一个twitter流,并在bolt中执行一些计算。但我想不出我该如何在喷口中编码。我已经阅读了各种streamparse教程,但它们都显示了从静态列表发出元组的喷口,并且没有twitter流api提供的流。这是我的风暴代码:

class WordSpout(Spout):

def initialize(self, stormconf, context):
    self.words = itertools.cycle(['dog', 'cat','zebra', 'elephant'])
def next_tuple(self):
    word = next(self.words)
    self.emit([word])

这是我给tweepy的密码:

class listener(StreamListener):

def on_status(self,status):
    print(status.text)
    print "--------------------------------"
    return(True)

def on_error(self, status):
    print "error"
def on_connect(self):
    print "CONNECTED"

auth = OAuthHandler(ckey, csecret)

auth.set_access_token(atoken, asecret)

twitterStream = Stream(auth, listener())
twitterStream.filter(track=["california"])

我应该如何集成这两个代码?

bwleehnv

bwleehnv1#

为此,我设置了一个kafka队列,tweepy侦听器使用pykafka将status.text写入队列。然后喷口不断地从队列中读取数据以执行分析。我的代码有点像这样:
侦听器.py:

class MyStreamListener(tweepy.StreamListener):

  def on_status(self, status):
    # print(status.text)
    client = KafkaClient(hosts='127.0.0.1:9092')

    topic = client.topics[str('tweets')]
    with topic.get_producer(delivery_reports=False) as producer:

        # print status.text
        sentence = status.text
        for word in sentence.split(" "):
            if word is None:
                continue
            try:
                word = str(word)
                producer.produce(word)
            except:
                continue

  def on_error(self, status_code):
    if status_code == 420:  # exceed rate limit
        return False
    else:
        print("Failing with status code " + str(status_code))
        return False

auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

myStream.filter(track=['is'])

喷口文件:

from streamparse.spout import Spout
from pykafka import KafkaClient

class TweetSpout(Spout):
    words = []

    def initialize(self, stormconf, context):
        client = KafkaClient(hosts='127.0.0.1:9092')

        self.topic = client.topics[str('tweets')]

    def next_tuple(self):
        consumer = self.topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                self.emit([message.value])
        else:
            self.emit()

相关问题