scala—使用akka流和kafka偏移提交将事件从kafka流到couchbase

r3i60tvu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(365)

我正在尝试设计一个阿克卡流,使用alpakka读取Kafka主题中的事件,并将它们放到沙发上。
到目前为止,我有以下代码,它似乎以某种方式工作:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .map(profile ⇒ {
        RawJsonDocument.create(profile.record.key(), profile.record.value())
      })
      .via(
        CouchbaseFlow.upsertDoc(
          sessionSettings,
          writeSettings,
          bucketName
        )
      )
      .log("Couchbase stream logging")
      .runWith(Sink.seq)

我所说的“莫名其妙”是指流实际上是从主题中读取事件,并将它们作为json文档放到couchbase中,尽管我不知道如何将使用者偏移量提交给kafka,但它看起来更漂亮。
如果我已经清楚地理解了隐藏在kafka消费偏移后面的主要思想,那么在发生任何故障或重新启动的情况下,流将从上一个提交的偏移中读取所有消息,并且由于我们没有提交任何消息,它可能会再次重新读取上一个会话中读取的记录。
我的假设对吗?如果是这样,那么在从kafka读取并发布到某个数据库的情况下,如何处理消费者提交?官方的akka streams文档提供了一些示例,展示了如何使用普通的kafka流来处理此类案例,因此我不知道如何在我的案例中提交补偿。
太好了,谢谢!

gtlvzcf8

gtlvzcf81#

您需要在couchbase中提交偏移量,以便获得“恰好一次”的语义。
这将有助于:https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-Kafka外部仓库

相关问题