@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(List<Person> person) {
//....
}
spring:
cloud:
stream:
kafka:
binders:
bulkKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
max.poll.records: 1500
fetch.min.bytes: 10000000
fetch.max.wait.ms: 10000
value.deserializer: tr.cloud.stream.examples.PersonDeserializer
bindings:
person-topic-in:
binder: bulkKafka
destination: person-topic
contentType: application/person
group : person-group
consumer:
batch-mode: true
我用的是Kafka的Spring Cloud流。在streamlistener中,当分区计数为1时,我可以以批处理模式每5000毫秒消耗一次记录。
我的.yml配置是fetch.min.bytes=10000000&&fetch.max.wait.ms=50000&&max.poll.records=1500,如上所述。
我可以每5000毫秒接收一次批记录,因为批记录大小不超过10000000字节。
但当分区计数大于1时,streamlistener会使用早于5000毫秒的记录。
这个案子有什么配置吗?
或者这种情况是独立线程为每个分区工作的自然结果?
当分区计数大于1时,工作逻辑有什么区别?
暂无答案!
目前还没有任何答案,快来回答吧!