当只有一个流数据源时,多线程kafka producer将如何工作?假设我们有三个生产者线程。每个生产商如何知道数据消耗将从何处开始?我们不希望每个生产商使用相同的数据(重复)。
ijnw1ujt1#
在我看来,这不是Kafka的问题,而是如何在使用生产者向Kafka发送消息之前同步客户端(作为api的接收器)。
hec6srdp2#
多线程Kafka生产者将如何工作时,你有一个单一的流数据源?您可能需要使用某种中间状态来跟踪成功存储在kafkastream中的事件的主键。把它想象成关系数据库,数据库将在其中抛出 DuplicateKeyException 当您尝试用相同的主键写入多个记录时。与kafka stream一样,在事件中选择一些独特的属性作为主键,并将它们存储在某种缓存中(如果所有生产者都在同一台机器/同一个应用程序中,则简单地使用hashmap,但如果生产者分布在不同的机器上,则使用redis、memcached等分布式缓存)因此,如果事件已经在 EMITTED 缓存中的状态删除事件。
DuplicateKeyException
EMITTED
| emitted events | Producer1---event1, event10 ---| e1 - EMITTED | | e10 - EMITTED | eventstream | |-------------------- Producer2---event1, event2-----| e2 - EMITTED | e1, e10, e2, e3 (ignores e1) | | | |-------------------- Producer3---event1, event3-----| e3 - EMITTED | (ignores e1) | |
你可能还想追踪事件的两种状态, EMIT_INITIATED 以及 EMITTED 一旦Kafka斯特伦承认坚持。这将解决生产者1和生产者2都试图发射的问题 event1 同时,由于它们中没有一个在缓存中看到它,所以它们都将被发送到流中。
EMIT_INITIATED
event1
2条答案
按热度按时间ijnw1ujt1#
在我看来,这不是Kafka的问题,而是如何在使用生产者向Kafka发送消息之前同步客户端(作为api的接收器)。
hec6srdp2#
多线程Kafka生产者将如何工作时,你有一个单一的流数据源?
您可能需要使用某种中间状态来跟踪成功存储在kafkastream中的事件的主键。
把它想象成关系数据库,数据库将在其中抛出
DuplicateKeyException
当您尝试用相同的主键写入多个记录时。与kafka stream一样,在事件中选择一些独特的属性作为主键,并将它们存储在某种缓存中(如果所有生产者都在同一台机器/同一个应用程序中,则简单地使用hashmap,但如果生产者分布在不同的机器上,则使用redis、memcached等分布式缓存)
因此,如果事件已经在
EMITTED
缓存中的状态删除事件。你可能还想追踪事件的两种状态,
EMIT_INITIATED
以及EMITTED
一旦Kafka斯特伦承认坚持。这将解决生产者1和生产者2都试图发射的问题event1
同时,由于它们中没有一个在缓存中看到它,所以它们都将被发送到流中。