kafka流的正确方法

gijlo24d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(354)

我们正在尝试使用akka streams和alpakka kafka来消费服务中的事件流。为了处理事件处理错误,我们使用kafka autocommit和多个队列。例如,如果我们有主题 user_created ,我们希望从产品和服务中使用它,我们还创建 user_created_for_products_failed 以及 user_created_for_products_dead_letter . 这两个额外的主题与一个特定的Kafka消费群体相结合。如果一个事件处理失败,它将进入失败的队列,在那里我们将在5分钟内再次尝试消费——如果它再次失败,它将进入死信。
在部署时,我们希望确保不会丢失事件。因此,我们尝试在停止应用程序之前停止流。正如我所说的,我们正在使用autocommit,但是所有这些正在“飞行”的事件还没有被处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
在阅读了文档之后,我们看到了 KillSwitch 功能。我们从中看到的问题是 shutdown 方法返回 Unit 相反 Future[Unit] 如我们所料。我们不确定是否会丢失使用它的事件,因为在测试中,它看起来运行得太快,无法正常工作。
作为解决方法,我们创建 ActorSystem 并使用 terminate 方法(返回 Future[Terminate] ). 这个解决方案的问题是,我们不认为 ActorSystem 每一条河流都能很好地扩展 terminate 需要花费大量时间来解决(在测试中,关闭最多需要一分钟)。
你遇到过这样的问题吗?有没有比这更快的方法 ActorSystem.terminate )停止流并确保 Source 是否已处理?

suzh9iv8

suzh9iv81#

根据文件(重点):
使用外部偏移存储时,调用 Consumer.Control.shutdown() 足以完成 Source ,它开始流的完成。

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()
``` `Consumer.control.shutdown()` 返回一个 `Future[Done]` . 根据scaladoc的描述:
关闭耗电元件 `Source` . 它将等待未完成的偏移提交请求完成,然后关闭。
或者,如果您在kafka中使用偏移存储,请使用 `Consumer.Control.drainAndShutdown` ,它还返回 `Future` . 同样来自文档(其中包含关于 `drainAndShutdown` 是否在封面下):

val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()

val streamComplete = drainingControl.drainAndShutdown()

的scaladoc描述 `drainAndShutdown` :
停止生成来自 `Source` ,等待流完成并关闭使用者 `Source` 以便所有消耗的消息都到达流的末尾。流完成失败将被传播,源将被关闭。

相关问题