如何从Kafka主题中一个接一个地使用消息

xpszyzbs  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(462)

我用一个分区构建了一个Kafka主题。

kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1

可以在这个主题中推送许多消息。
但我希望单个消费者(对于给定的组)逐个处理这些消息。

spring:
  application:
    name: file-consumer
  cloud:
    stream:
      kafka:
        binder:
          type: kafka
          brokers: localhost
          defaultBrokerPort: 29092
          defaultZkPort: 32181
          configuration:
            max.request.size: 300000
            max.message.bytes: 300000
        bindings:
          fileWriteBindingInput:
            consumer:
              autoCommitOffset: false
      bindings:
        fileWriteBindingInput:
          binder: kafka
          destination: files.write
          group: ${spring.application.name}
          contentType: 'text/plain'

以及java示例代码

@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

    // I Would like here to synchronize the processing of messages one by one
    // But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message

    acknowledgment.acknowledge();
}

我的配置中缺少什么?
我想,虽然消息没有被确认(偏移量没有增加),但是没有其他消息从同一分区被使用。

0dxa2lsx

0dxa2lsx1#

如果 autoCommitOffset 如果已启用(这是默认值),则绑定器将已经确认每个记录。所以到那时,你的问题就解决了 StreamListener ,记录已被确认。
更正:以上关于 StreamListener 不完全正确。侦听器退出时自动确认。
因为您只有一个分区,所以您将按照发送到该主题分区的顺序获得消息。您可以禁用 autoCommitOffset ,在这种情况下,可以使用手动确认。

oxcyiej7

oxcyiej72#

不确认一条消息与阻止下一条消息被传递无关。
您不能将消息传递给另一个线程并在以后确认它;如果想要单线程处理,则必须在侦听器线程上执行所有处理。

wswtfjt7

wswtfjt73#

您可以设置此使用者配置 max.poll.records 默认值为500
最大轮询记录数
对poll()的单个调用中返回的最大记录数。

相关问题