多个kafka分区到akka流

r8xiu3jd  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(327)

嗨,我是同事 Kafka 以及 Akka Streams . 在 Kafka 为了一个主题 MyTestTopic 我有3个分区,数据以高并发的方式被推送到主题中 1000 QPS 它只会比这更高。
以下是我为akka stream kafka消费者提供的代码:

final ConsumerSettings<String, byte[]> consumerSettings =
        ConsumerSettings.create(kafkaConfig, new StringDeserializer(), new ByteArrayDeserializer())
                .withBootstrapServers("127.0.0.1:9092")
                .withGroupId("TestConsumerGroup")
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
                .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(timeout));

ActorMaterializer materializer = ActorMaterializer.create(system);

RestartSource.onFailuresWithBackoff(
        java.time.Duration.ofSeconds(3),
        java.time.Duration.ofSeconds(3000),
        0.2,
        () -> Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("MyTestTopic"))
                .mapAsyncUnordered(10,
                        record -> ask(rootHandler, new StreamData(record), Duration.ofSeconds(timeout))))
        .to(Sink.foreach(App::sinkParser))
        .run(materializer);

我的问题是:
如何定义多个 Akka Stream consumers 听不同的音乐 Kafka partitions 因为多个分区导致一个akka steam的示例看起来像一个 bottle-neck .
Akka Clustering 答案是什么?保持 2 seed nodes 在静态服务器和 multiple akka stream consumers 在基于云的环境中自动缩放。
我好像想不通,我需要帮助谢谢

b1uwtaje

b1uwtaje1#

有几种方法可以解决这个问题,具体取决于您没有详细说明的细节:
如果您合理地确定一个节点可以处理所有消息,那么可以设置多个流,每个分区最多一个流。
这种方法的一种改进是使用committablepartitionedsource,以便动态创建尽可能多的流。请注意,您需要手动提交偏移量(例如使用 Committer.sink ).
每个示例可以有一个流,并且部署的示例数量与分区数量相同;对于相同的使用者组,示例将在它们之间协调分区分配。当部署多个示例时,您可能需要或不需要akka集群,这取决于您所扮演的角色的性质 ask 他在做什么。
如果每个消息的参与者中没有维护任何状态(请注意,这将包括参与者对外部数据存储执行读-修改-写操作:如果您可以确保影响给定行的消息位于同一kafka分区中,您甚至可以在外部数据存储中不使用acid)您可能不需要akka集群。
如果参与者本身是有状态的(例如,他们正在跟踪某个物联网设备),那么您几乎可以肯定需要akka cluster、akka cluster sharding和akka persistence的组合。与在外部数据存储上执行读-修改-写操作的参与者相比,一直这样做确实有一些优势(例如,为了跟踪参与者中的状态并支持事件源,可以消除大多数读取)。

相关问题