让我们考虑这段代码
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json
# Function to handle Kafka consumer
def kafka_consumer():
def fetch_data():
def poll_kafka():
msg = consumer.poll(0.1)
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
return
else:
return
else:
print("message", msg, msg.value())
consumer.commit() # Manually commit the offset
# Execute Kafka polling in a separate thread
d1 = threads.deferToThread(poll_kafka)
def start_loop():
lc = LoopingCall(fetch_data)
lc.start(0.5)
conf = {
'bootstrap.servers': 'kafka_internal-1:29093',
'group.id': 'your_consumer_group-2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable autocommit
}
consumer = Consumer(conf)
consumer.subscribe(['jithin_test']) # <-- is it a blocking call??
start_loop()
# Web service handler
class WebService(resource.Resource):
isLeaf = True
def render_GET(self, request):
# You can customize the response according to your needs
response = {
'message': 'Welcome to the Kafka consumer service!'
}
return json.dumps(response).encode('utf-8')
if __name__ == '__main__':
reactor.callWhenRunning(kafka_consumer)
# Run the Twisted web service
root = WebService()
site = server.Site(root)
reactor.listenTCP(8180, site)
reactor.run()
字符串
在这里,我在React器线程中从confluent-kafka
示例化Consumer
对象,然后将随后的poll()
留给deferToThread()
,我对此没有什么问题,
Consumer.subscribe()
是阻塞调用吗?调用时应该是deferTothread
这个方法吗?
1.如果我使用deferToThread
触发另一个对poll_kafka
的调用,会不会在消费者重新平衡的情况下破坏消费者(根据我的理解,每次我们使用deferToThread
运行的线程都来自线程池,并且不能保证我们将使用相同的线程)?
1.如果是这样,有没有办法管理这个?也许在一个单独的python线程中运行整个东西,并将消耗的值传递回扭曲的应用程序?
1.或者有没有一种方法可以在不破坏消费者的情况下重用消费者对象?
注:代码是用python2
写的,它是一些遗留系统的集成,移植整个东西是不可能的,ATM和大多数其他库都只支持python 3+。
1条答案
按热度按时间9jyewag01#
如果你想把事情推迟到一个不是React器线程的线程上,我建议你使用https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool和一个你自己管理的自定义https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html,
minthreads=1, maxthreads=1
。您确实需要对这个线程池进行一些烦人的生命周期管理,但在这个简单的示例中,这将只是
kafka_consumer
中的.start()
和.stop()
中添加到reactor.addSystemEventTrigger("before", "shutdown", ...)
的东西。