kafka是否允许一个线程或进程使用分区中的数据,而另一个线程或进程则负责在数据被完全处理后手动提交偏移量?
mm9b1k5b1#
直接从 KafkaConsumer 文档:Kafka消费者不是线程安全的。所有网络i/o都发生在发出调用的应用程序的线程中。...此规则的唯一例外是wakeup(),它可以从外部线程安全地用于中断活动操作。因此,不建议在一个线程之外使用使用者 wakeup 例外。
KafkaConsumer
wakeup
toe950272#
是的,我相信这是可能的。如上所述,kafkaconsumer对象不是线程安全的,因此每个线程都应该有自己的示例。两个示例应该具有相同的组id,当然应该禁用自动提交。有一些提交方法将特定分区和偏移量作为参数:https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commitsync-java.util.map-和https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commitasync-java.util.map-org.apache.kafka.clients.consumer.offsetcommitcallback-但是,我认为当通过subscribe方法使用自动组管理时,您可能无法做到这一点(旧的高级使用者风格用法),而是必须使用assign方法手动管理分区分配(与旧的简单使用者类似)。但你可以试试前者,看看是否也有可能。
2条答案
按热度按时间mm9b1k5b1#
直接从
KafkaConsumer
文档:Kafka消费者不是线程安全的。所有网络i/o都发生在发出调用的应用程序的线程中。
...
此规则的唯一例外是wakeup(),它可以从外部线程安全地用于中断活动操作。
因此,不建议在一个线程之外使用使用者
wakeup
例外。toe950272#
是的,我相信这是可能的。如上所述,kafkaconsumer对象不是线程安全的,因此每个线程都应该有自己的示例。两个示例应该具有相同的组id,当然应该禁用自动提交。有一些提交方法将特定分区和偏移量作为参数:https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commitsync-java.util.map-和https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#commitasync-java.util.map-org.apache.kafka.clients.consumer.offsetcommitcallback-
但是,我认为当通过subscribe方法使用自动组管理时,您可能无法做到这一点(旧的高级使用者风格用法),而是必须使用assign方法手动管理分区分配(与旧的简单使用者类似)。但你可以试试前者,看看是否也有可能。