嗨,我是同事 Kafka
以及 Akka Streams
. 在 Kafka
为了一个主题 MyTestTopic
我有3个分区,数据以高并发的方式被推送到主题中 1000 QPS
它只会比这更高。
以下是我为akka stream kafka消费者提供的代码:
final ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(kafkaConfig, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("127.0.0.1:9092")
.withGroupId("TestConsumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(timeout));
ActorMaterializer materializer = ActorMaterializer.create(system);
RestartSource.onFailuresWithBackoff(
java.time.Duration.ofSeconds(3),
java.time.Duration.ofSeconds(3000),
0.2,
() -> Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("MyTestTopic"))
.mapAsyncUnordered(10,
record -> ask(rootHandler, new StreamData(record), Duration.ofSeconds(timeout))))
.to(Sink.foreach(App::sinkParser))
.run(materializer);
我的问题是:
如何定义多个 Akka Stream consumers
听不同的音乐 Kafka partitions
因为多个分区导致一个akka steam的示例看起来像一个 bottle-neck
.
是 Akka Clustering
答案是什么?保持 2 seed nodes
在静态服务器和 multiple akka stream consumers
在基于云的环境中自动缩放。
我好像想不通,我需要帮助谢谢
1条答案
按热度按时间b1uwtaje1#
有几种方法可以解决这个问题,具体取决于您没有详细说明的细节:
如果您合理地确定一个节点可以处理所有消息,那么可以设置多个流,每个分区最多一个流。
这种方法的一种改进是使用committablepartitionedsource,以便动态创建尽可能多的流。请注意,您需要手动提交偏移量(例如使用
Committer.sink
).每个示例可以有一个流,并且部署的示例数量与分区数量相同;对于相同的使用者组,示例将在它们之间协调分区分配。当部署多个示例时,您可能需要或不需要akka集群,这取决于您所扮演的角色的性质
ask
他在做什么。如果每个消息的参与者中没有维护任何状态(请注意,这将包括参与者对外部数据存储执行读-修改-写操作:如果您可以确保影响给定行的消息位于同一kafka分区中,您甚至可以在外部数据存储中不使用acid)您可能不需要akka集群。
如果参与者本身是有状态的(例如,他们正在跟踪某个物联网设备),那么您几乎可以肯定需要akka cluster、akka cluster sharding和akka persistence的组合。与在外部数据存储上执行读-修改-写操作的参与者相比,一直这样做确实有一些优势(例如,为了跟踪参与者中的状态并支持事件源,可以消除大多数读取)。