Kafka与阿克卡集群

gxwragnw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(299)

下面是我的用例
一堆应用程序将kafka中不同主题下的消息排队。
让每个主题的使用者将工作分发给集群中的工作人员。工作可分为长时间工作、内存密集型工作、简单型工作等,并据此选择工作人员。
这让我探索了akka集群的工作分配、路由和扩展。我可以使用akka“supervisor”作为kafka消费者,并根据其分类将传入的工作分配给相应的工人。
但我仍在努力理解的是,如何在阿克卡集群的主管和工人之间实现一种弹性沟通方式。因为一旦主管使用了来自kafka的消息,kafka偏移量就被提交。如果在偏移提交之后处理过程中发生了错误,那么下面的恢复方法是否可以接受,并从上次保留的位置开始?
通过使用Kafka支持的持久邮箱,使主管成为持久的参与者。主管在Kafka排队工作,工人从Kafka拿到工作,只有在完成工作后才提交补偿。

0kjbasz6

0kjbasz61#

正如jaakko所说,这实际上取决于您使用的第三部分库。
就我而言,我已经成功地使用了akka-streams-kafka,尽管我启用了offset-auto-commit。
但是,这个库可以满足您的需要,因为它允许您自定义偏移提交(请参阅kafka中的外部偏移存储和偏移存储部分)。
文件上说:
consumer.committablesource可以将偏移位置提交给kafka。与自动提交相比,这提供了对消息何时被视为已使用的精确控制。
为了禁用自动提交,你必须完成你的akka application.conf 通过添加 akka.kafka.consumer 章节:

akka.kafka.consumer {

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.

  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

}

最新版本 akka-stream-kafka_2.11 (版本 0.16 )与akka兼容 2.5.x 但是您必须用akka工具箱中的一个重写akka-stream_.11依赖关系。目前,我正在与akka一起使用这个库 2.5.3 而且效果很好。
希望你能找到你想要的:)

相关问题