我从阿克卡河开始,到目前为止一切都很顺利。然而,我遇到了一个我不知道如何处理的用例。该场景是一个流,其中actorpublisher作为源使用来自kafka的消息,订阅者作为接收器更新cassandra表。
Kafka~>一些Map操作~>Cassandra
关键是,每次成功处理一条消息并将其插入cassandra时,我都要明确地向kafka确认,以便在发生灾难和服务失败时重新读取该消息,即某种至少一次的传递行为。我怎样才能在阿克卡河方面做到这一点?。是支持的方案吗?。
的确,我总是可以用自动提交行为配置kafka消费者,但我更愿意控制如何读取消息。
更新
关于这个主题,我们目前正在评估被动Kafka,他们在0.8版本的Kafka中加入了手动提交(这对这些家伙来说是个好消息)。这个特性将允许我们实现所需的alod行为。
1条答案
按热度按时间rggaifut1#
我认为您可以使用自定义流处理,使用pushpullstage,如akka streams文档中所述