在使用了kafka主题的所有可用消息之后,如何返回包含消息列表的future?

b4qexyjb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(653)

我可能没有理解Kafka消费者的观点,但我想做的是:
使用者订阅一个主题,获取该主题中的所有消息,并返回一个包含所有这些消息的未来列表
我编写的代码是

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}

def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
  .map { message =>
    logger.info(s"Consuming ${message.record.value}")
    KafkaMessage(Some(message.record.key()), Some(message.record.value()))
  }
  .buffer(bufferSize, overflowStrategy)
  .runWith(sink)

不过,未来永远不会回来,它会消耗必要的消息,然后继续反复轮询主题。有没有办法把未来还给消费者,然后关闭?

gcuhipw9

gcuhipw91#

正如kafka用于流式数据一样,没有“所有消息”这样的东西,因为新数据可以在任何时候附加到主题中。
我想,你可以做两件事:
检查最后一次返回的记录数 poll 终止或
您需要通过 endOffsets 并将其与每个分区的最新记录的偏移量进行比较。如果两者都匹配,您可以返回。
第一种方法比较简单,但可能有缺点,即它不如第二种方法可靠。从理论上讲,即使有可用的记录(即使发生这种情况时变化不是很高),投票也可能返回零记录。
不知道如何在scala中表达这个终止条件(因为我对scala不是很熟悉)。

相关问题