阿尔帕卡-Kafka河永远不会终止

lskq00tm  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(655)

我们使用alpakka-kafka流来消费来自kafka的事件。以下是如何将流定义为:

ConsumerSettings<GenericKafkaKey, GenericKafkaMessage> consumerSettings = 
    ConsumerSettings
        .create(actorSystem, new KafkaJacksonSerializer<>(GenericKafkaKey.class), 
                new KafkaJacksonSerializer<>(GenericKafkaMessage.class))
        .withBootstrapServers(servers).withGroupId(groupId)
        .withClientId(clientId).withProperties(clientConfigs.defaultConsumerConfig());
CommitterSettings committerSettings = CommitterSettings.create(actorSystem)
        .withMaxBatch(20)
        .withMaxInterval(Duration.ofSeconds(30));
Consumer.DrainingControl<Done> control = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(topics))
        .mapAsync(props.getMessageParallelism(), msg ->
                CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                        .thenCompose(param -> CompletableFuture.supplyAsync(() -> msg.committableOffset())))
        .toMat(Committer.sink(committerSettings), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(materializer);

下面是关闭流的一段代码:

CompletionStage<Done> completionStage = control.drainAndShutdown(actorSystem.dispatcher());
completionStage.toCompletableFuture().join();

我也试着去了解完全的未来。但无论是加入还是进入未来都不会回来。有没有其他人也面临类似的问题?我是不是做错什么了?

gg58donl

gg58donl1#

如果要从流外部控制流终止,则需要使用killswitch:https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

r8uurelv

r8uurelv2#

你的用法看起来是正确的,我不能确定任何会妨碍排水的东西。
阿尔帕卡·Kafka的消费者最常怀念的就是 stop-timeout 默认为30秒。当使用 DrainingControl 您可以安全地将其设置为0秒。
看到了吗https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-控制

相关问题