阿克卡流消费者Kafka延迟投票

fzsnzjdm  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(178)

我已经创建了一个kafkastreamconsumer,上面提到了默认配置https://doc.akka.io/docs/alpakka-kafka/0.11/consumer.html. 消费者工作正常;但是,在使用第一条消息之前有一个初始延迟。第一条消息之后的消息几乎是实时的。请建议如果我需要调整一些配置。我使用下面的代码来创建消费者-

val consumer: RunnableGraph[DrainingControl[Done]] =
Consumer
  .committableSource(consumerSettings, Subscriptions.topicPattern(conf.topic))
  .mapAsync(conf.workers)(KafkaMessageHandler(transactionWorker.handleTransactionMessage))
  .groupedWithin(conf.batchSize, 1.millis)
  .map(CommittableOffsetBatch(_))
  .mapAsync(conf.workers)(offset => {
    logger.debug(s"committing ${offset.getOffsets}")
    //TODO
    offset.commitScaladsl()
  })
  .toMat(Sink.ignore)(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题