我试着做以下工作,在一个单独的线程中启动流监听器,这个线程将创建一个队列,而这些队列稍后将被处理。。。然而 Storm
在线程之后什么都不做。卡在那边了。
我的代码如下:
import os, sys, traceback, random, StringIO, time
import random
from uuid import uuid4
from select import select
from subprocess import Popen,PIPE
import pyinotify
import simplejson, pycurl
import sys, signal
import twitter
import tweepy
import Queue
import threading
try:
import simplejson as json
except ImportError:
import json
import storm
queue = Queue.Queue()
class MyModelParser(tweepy.parsers.ModelParser):
def parse(self, method, payload):
result = super(MyModelParser, self).parse(method, payload)
result._payload = json.loads(payload)
return result
class CustomStreamListener(tweepy.StreamListener):
''' Handles data received from the stream. '''
def __init__(self, api, q):
self.api = api
self.queue = q
self.queue.put('lalala')
def on_status(self, status):
self.queue.put('%s' % status.author.screen_name)
self.queue.task_done()
def on_error(self, status_code):
return True # To continue listening
def on_timeout(self):
return True # To continue listening
class Starter():
def __init__(self,q):
self.queue = q
hashtag = ['justinbieber','snooki','daddy_yankee','MikeTyson','iamdiddy','lala']
auth = self.t_auth()
api = tweepy.API(auth, parser=MyModelParser())
stream = tweepy.streaming.Stream(auth,CustomStreamListener(api,queue))
stream.filter(follow=None, track=hashtag)
def t_auth(self):
consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
return auth
class TwitterSpout(storm.Spout):
SPOUT_NAME = "TwitterSpout"
queue = queue
def initialize(self, conf, context):
self.pid = os.getpid()
try:
t = threading.Thread(target=Starter(self.queue) )
t.daemon=True
t.start()
except KeyboardInterrupt, e:
self.log('\n\nStopping')
raise
1条答案
按热度按时间8iwquhpp1#
使用幽门(https://github.com/yelp/pyleus)而且您的spout实现应该有next\ tuple(self):它应该发出输出字段,如下例所示;
然后写下你的螺栓;
你也可以看看我是如何使用它的;https://github.com/yelp/pyleus/issues/140