我正在研究一个spring引导服务,它从apachekafka读取消息,通过http从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
这是在几个服务中完成的,通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
主题本身有20个分区,应该适合。
在监视kafka的读取时,我们发现吞吐量非常低,行为也很奇怪:
这个应用程序一次最多可以读取500条信息,然后是1-2分钟的空白。在这段时间内,使用者反复记录它是“丢失心跳,因为分区被重新平衡”、“重新分配分区”,有时甚至抛出异常,说它“未能提交,因为轮询间隔已过”
我们得出的结论是,这意味着使用者获取500条消息,处理所有消息需要很长时间,错过了它的时间窗口,因此无法将500条消息中的任何一条提交给代理—代理将重新分配分区并重新发送相同的消息。
在浏览了线程和文档之后,我找到了“max.poll.records”属性,但在设置该属性的位置上存在冲突。
有人说要把它放在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
有人说
spring.cloud.stream.kafka.binders.consumer-properties
我尝试将两者都设置为1,但服务行为没有改变。
我如何正确地处理这样一种情况,即消费者无法使用默认设置跟上所需的轮询间隔?
普通yaml:
spring.cloud.stream.default.group=${spring.application.name}
服务yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
“忽略此项”始终意味着,如果未设置其他属性,consumerconfiguration会将最大轮询记录的默认值保持为500
编辑:我们更接近了:
这个问题与spring retry设置了exponentialbackoffstrategy有关,还有一系列错误有效地停止了应用程序。
我不明白的是,我们强迫200个错误通过张贴错误的消息到主题的问题,这导致应用程序读取200,采取年龄(与旧的重试配置),然后提交所有200个错误一次。
如果我们有
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
1条答案
按热度按时间9o685dep1#
是的
请参阅文档。。。
Kafka消费地产
以下属性仅适用于kafka使用者,并且必须以
spring.cloud.stream.kafka.bindings.<channelName>.consumer.
...配置
使用包含通用kafka使用者属性的键/值对进行Map。
默认值:空Map。
...
你还可以增加
max.poll.interval.ms
.编辑
我刚刚用2.1.0.release进行了测试,它的工作原理与我描述的一样:
无设置
启动默认值
活页夹默认值#1
活页夹默认值#2
绑定默认值
结合特异性
编辑2
这是完整的测试应用程序。我只是在http://start.spring.io and 选择“Kafka”和“云流”。
和
和