如何控制/暂停子源/流中的akka流

uplii1fm  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(257)

我们使用akka streams和alpakka-kafka,使用主题模式订阅来使用来自不同主题的消息。使用groupby()为每个主题创建子源/流。
如何暂停/恢复/控制单个主题的消息处理,而不影响其他主题的消息处理。
以下选项不起作用。
使用restartsource将重新启动源代码并影响所有主题的处理。
使用restartflow将删除失败的元素,并影响所有主题的处理。
如果我们必须重新启动所有主题的源代码并且有额外的开销,那么为每个主题创建一个源代码是资源密集型的。

Consumer
    .committableSource(consumerSettings.withGroupId("test-group"),
            Subscriptions.topicPattern("/some/pattern"))
    .groupBy(5000, msg -> msg.record().topic())
    .mapAsync(1, msg -> business(msg).thenApply(response -> {
        if (response.status().isFailure()) {
            throw new RuntimeException("Boom");
        }
        return msg.committableOffset();}))
    .mapAsync(1, offset -> offset.commitJavadsl());

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题