在连接websocket时使用akka stream kafka从kafka主题获取最后一条消息

acruukt9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(418)

有没有可能得到最后一个信息Kafka主题使用阿克卡流Kafka?我正在创建一个websocket,它侦听一个kafka主题,但当前它在我连接时检索所有以前未读取的消息。这可以加起来相当多的消息,所以我只在最后一条消息+任何未来的消息(或仅限未来消息)
资料来源:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

使用者设置:

@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)

  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

我试着添加设置 ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 它应该“自动将偏移量重置为最新偏移量”,但似乎没有任何效果。

8qgya5xd

8qgya5xd1#

我能够避免在客户端连接时使用davidvangeest非常简洁地描述的方法获取任何上游数据
归根结底,在消费者身上有一个广播中心:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()

以及连接一个静态消费者来吃掉所有的上游数据

liveSource.to(Sink.ignore).run()

这样,我就可以让websocket客户端订阅消费者接收到的所有数据,如下所示:

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

或基于Kafka通论的过滤器(或任何你想要的)

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
  Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
    x =>
      (Json.parse(x) \ "topic").asOpt[String] match {
        case Some(str) => str.equals(kafkaTopic)
        case None => false
      }
  }))
}

这并不能解决在第一次连接时向用户提供x个数据量的问题,但是我预见到我们会为任何历史数据添加一个简单的数据库查询,并且让websocket连接只关注livestreaming数据。

相关问题