为什么在运行kafka producer时会出现错误?

ehxuflar  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(201)

你好,我正试图运行这个代码提取与Kafka实时推特,但是显示错误,在Kafka生产者收到当我运行它。我想不出问题出在哪里。

import json
from kafka import KafkaProducer
import tweepy
import configparser

class TweeterStreamListener(tweepy.StreamListener):

    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    def on_status(self, status):
        # This method is called whenever new data arrives from live stream.
        # We asynchronously push this data to kafka queue
        msg =  status.text.encode('utf-8')
        try:
            self.producer.send_messages(b'twitterstream', msg)
        except Exception as e:
            print(e)
            return False
        return True

    def on_error(self, status_code):
        print("Error received in kafka producer")
        return True # Don't kill the stream

    def on_timeout(self):
        return True # Don't kill the stream

谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题