我正在尝试设计一个阿克卡流,使用alpakka读取Kafka主题中的事件,并将它们放到沙发上。
到目前为止,我有以下代码,它似乎以某种方式工作:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicIn))
.map(profile ⇒ {
RawJsonDocument.create(profile.record.key(), profile.record.value())
})
.via(
CouchbaseFlow.upsertDoc(
sessionSettings,
writeSettings,
bucketName
)
)
.log("Couchbase stream logging")
.runWith(Sink.seq)
我所说的“莫名其妙”是指流实际上是从主题中读取事件,并将它们作为json文档放到couchbase中,尽管我不知道如何将使用者偏移量提交给kafka,但它看起来更漂亮。
如果我已经清楚地理解了隐藏在kafka消费偏移后面的主要思想,那么在发生任何故障或重新启动的情况下,流将从上一个提交的偏移中读取所有消息,并且由于我们没有提交任何消息,它可能会再次重新读取上一个会话中读取的记录。
我的假设对吗?如果是这样,那么在从kafka读取并发布到某个数据库的情况下,如何处理消费者提交?官方的akka streams文档提供了一些示例,展示了如何使用普通的kafka流来处理此类案例,因此我不知道如何在我的案例中提交补偿。
太好了,谢谢!
1条答案
按热度按时间gtlvzcf81#
您需要在couchbase中提交偏移量,以便获得“恰好一次”的语义。
这将有助于:https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-Kafka外部仓库