阿尔帕卡流无法读取Kafka

nhjlsmyf  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(337)

我在alpakka项目的基础上构建了一个非常简单的akka流,但是它没有阅读kafka的任何内容,即使它连接并创建了一个消费群体。我已经为流创建了一个隐式的actor系统和materializer。

val done = Consumer.committableSource(consumerSettings,
Subscriptions.topics(kafkaTopic))
.map(msg => msg.committableOffset)
.mapAsync(1) { offset =>
offset.commitScaladsl()
}
.runWith(Sink.ignore)

[stream.actor.dispatcher]将此消息发送给kafkaconsumeractor“请求消息,请求ID:1,分区:set(kafka-topic-0)”
kafkaconsumerator似乎没有收到消息,但是当主管要求参与者关机时,它确实收到了消息并关机。
有什么线索能解释为什么Kafka没有错误或例外?

moiiocjp

moiiocjp1#

我不明白为什么我的akka流没有使用来自kafka代理的消息,但是当我实现同一个流作为可运行的图时,它起了作用。
我用过的例子-https://www.programcreek.com/scala/akka.stream.scaladsl.runnablegraph

相关问题