如何对来自未绑定队列的消息进行多处理

wpcxdonn  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(593)

我正在写一个python应用程序。它用一个消费者读出Kafka的主题。对于每条消息,它会做一些事情,这些事情可能需要一段时间才能完成,然后再对下一条消息做一些事情。
大多数使用多处理库的应用程序都需要传递一些有限的iterable来Map\u async或应用\u async。我用这两个函数来解决这个问题的尝试似乎不起作用,我想是因为在这个例子中我们的iterable是kafka主题,它是一个未绑定的队列。在这种情况下,有没有办法以非阻塞的方式“做一些事情”?

pkln4tw6

pkln4tw61#

您可以创建一个子进程并将消息传递给它以处理某些内容:

from confluent_kafka import Consumer, KafkaError
from multiprocessing import Process

def do_stuff(msg):
    my_stuff = 'is doing here as a non-blocking way'

c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    process = Process(target=do_stuff, args=(msg.value().decode('utf-8'), ))
    process.start()

c.close()

相关问题