kafka python,如何跟踪消费者开始了一个不同的过程

1aaf6o9v  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(264)

我对python还比较陌生,刚开始接触kafka,所以如果我在某个地方错了,请原谅我的术语。
所以我有一个基于django的web应用程序,在这个应用程序中,我通过kafka producer在同一个进程中发送json消息。然而,在务实地创建一个主题的同时,我也在为该特定主题启动(订阅)一个单独的过程中的新消费者。


# Consumer code snippet

 if topic_name is not None :
        #Create topic
        create_kafka_topic_instance(topic_name)
        #Initialize a consumer and subscribe to topic
        Process(target=init_kafka_consumer_instance, args=(topic_name))

def forgiving_json_deserializer(v):
    if v is None :
        return
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        import traceback
        print(traceback.format_exc())
        return None

def init_kafka_consumer_instance(topic, group_id=None):
    try:
        if topic is None:
            raise Exception("Invalid argument topic")
        comsumer = None
        comsumer = KafkaConsumer(topic, bootstrap_servers=[KAFKA_BROKER_URL], auto_offset_reset="earliest",
           urn comsumer
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    return Noneurn comsumer
    except Exception as e:
        import traceback
        print(traceback.format_exc())
    return None

生产者代码段


# assuming obj is a model instance

        serialized_obj = serializers.serialize('json', [ order, ])
        #send_message(topic_name,order)
        producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER_URL], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
        x = producer.send("test", serialized_obj)
        producer.flush()

现在我有一些查询,所以如果我的django应用程序(服务器)以某种方式重新启动,我仍然会让使用者听这个主题。
另外,我在消费者中有一些在服务器控制台中看不到的print语句。
然而,在pythonshell中编写相同的代码片段(初始化使用者),我可以在print语句中看到消息,这意味着我的生产者工作正常。

ipakzgxi

ipakzgxi1#

kafka服务器不依赖于django应用程序(服务器)。但你的消费者是肯定的。
因此,您的主题仍然存在于kafka服务器中(如果kafka服务器死掉了,那就另当别论了),但是您的使用者将使用您的应用程序重新启动。
因此,如果你想让你的消费者工作得很好,那就让它成为一个与你的应用程序并行工作的工作者,并且在你的应用程序关闭时不会重新启动

相关问题