我对python和Kafka完全是个新手。我有一个脚本,应该启动三个kafka消费者,等待这些消费者的消息,然后做一些其他的事情。在这一点上,我甚至不知道我是否朝着正确的方向前进,所以任何帮助都将受到感谢。
class MainClass():
def do_something_before(self):
# something is done here
def start_consumer(self):
consumer1_thread = threading.Thread(target=self.cons1, args=())
consumer2_thread = threading.Thread(target=self.cons2, args=())
consumer1_thread.daemon = True
consumer2_thread.daemon = True
consumer1_thread.start()
consumer2_thread.start()
def cons1(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
for message in consumer:
print(message.value)
def cons2(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my2-topic'])
for message in consumer:
print(message.value)
def keep_working(self):
# something is done here
if __name__ == 'main':
g = MainClass()
g.do_something_before()
g.keep_working()
2条答案
按热度按时间5us2dqdw1#
我已经添加了python-kafka示例和2个使用者(基本上是两个python进程),您可以在github链接中找到它https://github.com/shubhamgorde/kafka-python-app.
不能发布整个python文件,它有点大。
lrl1mhuk2#
这是我的实现。希望你觉得有用。