下面是我的用例
一堆应用程序将kafka中不同主题下的消息排队。
让每个主题的使用者将工作分发给集群中的工作人员。工作可分为长时间工作、内存密集型工作、简单型工作等,并据此选择工作人员。
这让我探索了akka集群的工作分配、路由和扩展。我可以使用akka“supervisor”作为kafka消费者,并根据其分类将传入的工作分配给相应的工人。
但我仍在努力理解的是,如何在阿克卡集群的主管和工人之间实现一种弹性沟通方式。因为一旦主管使用了来自kafka的消息,kafka偏移量就被提交。如果在偏移提交之后处理过程中发生了错误,那么下面的恢复方法是否可以接受,并从上次保留的位置开始?
通过使用Kafka支持的持久邮箱,使主管成为持久的参与者。主管在Kafka排队工作,工人从Kafka拿到工作,只有在完成工作后才提交补偿。
1条答案
按热度按时间0kjbasz61#
正如jaakko所说,这实际上取决于您使用的第三部分库。
就我而言,我已经成功地使用了akka-streams-kafka,尽管我启用了offset-auto-commit。
但是,这个库可以满足您的需要,因为它允许您自定义偏移提交(请参阅kafka中的外部偏移存储和偏移存储部分)。
文件上说:
consumer.committablesource可以将偏移位置提交给kafka。与自动提交相比,这提供了对消息何时被视为已使用的精确控制。
为了禁用自动提交,你必须完成你的akka
application.conf
通过添加akka.kafka.consumer
章节:最新版本
akka-stream-kafka_2.11
(版本0.16
)与akka兼容2.5.x
但是您必须用akka工具箱中的一个重写akka-stream_.11依赖关系。目前,我正在与akka一起使用这个库2.5.3
而且效果很好。希望你能找到你想要的:)