我希望使用confluent-kafka-python API来使用来自单个Kafka主题的消息,kafka consume()和poll()在以批处理方式累积小(如10条)或大(如1000-10000条)消息时,性能是否有差异?poll()一次只返回一条消息,因此我们需要一个自定义逻辑来处理批处理。
bis0qfac1#
DeserializingConsumer(您链接到docs的类)不支持consume(),它只提供poll()方法。如果这是您计划使用的类,则poll()是您唯一的选择。但是,常规的confluent_kafka.Consumer类确实支持这两种类型。如果您使用常规的Consumer,并且您的目的是处理成批的消息,我建议使用consume(),以避免实现与poll()类似的语义所需的复杂性。至于性能差异,这两种方法都非常有效,并且都是在关键路径中调用的。2需要考虑的主要区别是语义。由于poll()会触发回调,因此调用poll()可能会在应用程序在批处理中累积所需数量的消息之前触发重新平衡回调,此时使用者可能必须决定如何处理这些消息。在stop-the-world重新平衡中,所有分区都将从使用者中撤销,这很简单,并且可以直接进行清理。在增量重新平衡中,但在重新平衡过程中,并非所有分区都可能丢失,因此请确保正确处理此问题。consume()也会触发回调,但处理重新平衡要简单得多,因为在调用之间没有存储部分累积的消息批。
DeserializingConsumer
consume()
poll()
confluent_kafka.Consumer
Consumer
1条答案
按热度按时间bis0qfac1#
DeserializingConsumer
(您链接到docs的类)不支持consume()
,它只提供poll()
方法。如果这是您计划使用的类,则poll()
是您唯一的选择。但是,常规的
confluent_kafka.Consumer
类确实支持这两种类型。如果您使用常规的Consumer
,并且您的目的是处理成批的消息,我建议使用consume()
,以避免实现与poll()
类似的语义所需的复杂性。至于性能差异,这两种方法都非常有效,并且都是在关键路径中调用的。2需要考虑的主要区别是语义。由于
poll()
会触发回调,因此调用poll()
可能会在应用程序在批处理中累积所需数量的消息之前触发重新平衡回调,此时使用者可能必须决定如何处理这些消息。在stop-the-world重新平衡中,所有分区都将从使用者中撤销,这很简单,并且可以直接进行清理。在增量重新平衡中,但在重新平衡过程中,并非所有分区都可能丢失,因此请确保正确处理此问题。consume()
也会触发回调,但处理重新平衡要简单得多,因为在调用之间没有存储部分累积的消息批。