twisted integrating blocking confluent-kafka pythong library issues

wmvff8tz  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(141)

让我们考虑这段代码

from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json

# Function to handle Kafka consumer
def kafka_consumer():
    def fetch_data():
        def poll_kafka():
            msg = consumer.poll(0.1)  
            if msg is None:
                return
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    return
                else:
                    return
            else:
                print("message", msg, msg.value())
                consumer.commit()  # Manually commit the offset

        # Execute Kafka polling in a separate thread
        d1 = threads.deferToThread(poll_kafka)

    def start_loop():
        lc = LoopingCall(fetch_data)
        lc.start(0.5) 

    conf = {
        'bootstrap.servers': 'kafka_internal-1:29093',
        'group.id': 'your_consumer_group-2',  
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False  # Disable autocommit
    }

    consumer = Consumer(conf)
    consumer.subscribe(['jithin_test'])  # <-- is it a blocking call??

    start_loop()

# Web service handler
class WebService(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        # You can customize the response according to your needs
        response = {
            'message': 'Welcome to the Kafka consumer service!'
        }
        return json.dumps(response).encode('utf-8')

if __name__ == '__main__':
    reactor.callWhenRunning(kafka_consumer)

    # Run the Twisted web service
    root = WebService()
    site = server.Site(root)
    reactor.listenTCP(8180, site)
    reactor.run()

字符串
在这里,我在React器线程中从confluent-kafka示例化Consumer对象,然后将随后的poll()留给deferToThread(),我对此没有什么问题,

  1. Consumer.subscribe()是阻塞调用吗?调用时应该是deferTothread这个方法吗?
    1.如果我使用deferToThread触发另一个对poll_kafka的调用,会不会在消费者重新平衡的情况下破坏消费者(根据我的理解,每次我们使用deferToThread运行的线程都来自线程池,并且不能保证我们将使用相同的线程)?
    1.如果是这样,有没有办法管理这个?也许在一个单独的python线程中运行整个东西,并将消耗的值传递回扭曲的应用程序?
    1.或者有没有一种方法可以在不破坏消费者的情况下重用消费者对象?
    注:代码是用python2写的,它是一些遗留系统的集成,移植整个东西是不可能的,ATM和大多数其他库都只支持python 3+。
9jyewag0

9jyewag01#

如果你想把事情推迟到一个不是React器线程的线程上,我建议你使用https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool和一个你自己管理的自定义https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.htmlminthreads=1, maxthreads=1
您确实需要对这个线程池进行一些烦人的生命周期管理,但在这个简单的示例中,这将只是kafka_consumer中的.start().stop()中添加到reactor.addSystemEventTrigger("before", "shutdown", ...)的东西。

相关问题