我可能没有理解Kafka消费者的观点,但我想做的是:
使用者订阅一个主题,获取该主题中的所有消息,并返回一个包含所有这些消息的未来列表
我编写的代码是
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
不过,未来永远不会回来,它会消耗必要的消息,然后继续反复轮询主题。有没有办法把未来还给消费者,然后关闭?
1条答案
按热度按时间gcuhipw91#
正如kafka用于流式数据一样,没有“所有消息”这样的东西,因为新数据可以在任何时候附加到主题中。
我想,你可以做两件事:
检查最后一次返回的记录数
poll
终止或您需要通过
endOffsets
并将其与每个分区的最新记录的偏移量进行比较。如果两者都匹配,您可以返回。第一种方法比较简单,但可能有缺点,即它不如第二种方法可靠。从理论上讲,即使有可用的记录(即使发生这种情况时变化不是很高),投票也可能返回零记录。
不知道如何在scala中表达这个终止条件(因为我对scala不是很熟悉)。