我正在实现一个情绪分析器,使用tweets使用kafka作为初始步骤我正在使用tweepyapi读取tweets,然后通过kafka producer api流式传输它。
在下面找到我的streamlistener类代码
class MyListener(StreamListener):
def __init__(self,api):
self.api = api
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
def on_data(self, data):
try:
if "\"location\":null" not in data:
self.producer.send('twitter',data)
return True
except BaseException as e:
print(str(e))
return False
def on_error(self, status):
print(status)
return True
我被困的地方是我需要发送整个数据。我无法做到这一点,我不知道我是否必须转换成字节流,以便发送它作为Kafka只是传输字节的数据。
请让我知道一种方法来传输整个数据字符串作为收到。
暂无答案!
目前还没有任何答案,快来回答吧!