我知道Kafka可以分批处理事件。我试图理解这种情况:一个主题有4个分区我有一个消费者Kafka分配所有4个分区给它。假设kafka客户机从kafka获取的每个批处理有5条消息。我试图理解的是,如果一个批处理中的事件都来自同一个分区,然后循环到下一个分区批处理。或者批处理本身是否已经包含来自不同分区的事件?
50few1ms1#
我不能给你一个确切的答案,但我发现它很有趣,足以检验它。为此,我创建了一个包含四个分区的主题,并使用 kafka-producer-perf-test 命令行工具生成一些消息到主题中。由于性能测试工具根本不会创建任何键,因此消息以循环方式写入主题分区。
kafka-producer-perf-test
kafka-producer-perf-test --topic test --num-records 1337 --throughput -1 --record-size 128 --producer-props key.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props value.serializer=org.apache.kafka.common.serialization.StringSerializer --producer-props bootstrap.servers=localhost:9092
之后,我使用配置创建了一个简单的kafkaconsumer max_poll_records=5 来配合你的问题。使用者只需打印出消耗的每条消息的偏移量和分区:
max_poll_records=5
Integer counter = 0; // consume messages with `poll` call and print out results try(KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(settings)) { consumer.subscribe(Arrays.asList(topic)); while (true) { System.out.printf("Batch = %d\n", counter); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d\n", record.offset(), record.partition()); } counter += 1; } }
回答您的问题的结果是,使用者在移动到另一个分区之前尝试从一个分区获取尽可能多的数据。仅在所有来自分区的消息 1 已消耗,但未达到最大轮询记录数5的限制,但它又从分区添加了两条消息 2 .以下是一些指纹,以便更好地理解。
1
2
Batch = 0 offset = 310, partition = 0 offset = 311, partition = 0 offset = 312, partition = 0 offset = 313, partition = 0 offset = 314, partition = 0 Batch = 1 offset = 315, partition = 0 offset = 316, partition = 0 offset = 317, partition = 0 offset = 318, partition = 0 offset = 319, partition = 0 # only offsets with partition 0 Batch = 45 offset = 525, partition = 0 offset = 526, partition = 0 offset = 527, partition = 0 offset = 528, partition = 0 offset = 529, partition = 0 Batch = 46 offset = 728, partition = 1 offset = 729, partition = 1 offset = 730, partition = 1 offset = 731, partition = 1 offset = 732, partition = 1 # only offsets with partition 1 Batch = 86 offset = 928, partition = 1 offset = 929, partition = 1 offset = 930, partition = 1 offset = 931, partition = 1 offset = 932, partition = 1 Batch = 87 offset = 465, partition = 2 offset = 466, partition = 2 offset = 933, partition = 1 offset = 934, partition = 1 offset = 935, partition = 1 Batch = 88 offset = 467, partition = 2 offset = 468, partition = 2 offset = 469, partition = 2 offset = 470, partition = 2 offset = 471, partition = 2 ## and so on
1条答案
按热度按时间50few1ms1#
我不能给你一个确切的答案,但我发现它很有趣,足以检验它。
为此,我创建了一个包含四个分区的主题,并使用
kafka-producer-perf-test
命令行工具生成一些消息到主题中。由于性能测试工具根本不会创建任何键,因此消息以循环方式写入主题分区。之后,我使用配置创建了一个简单的kafkaconsumer
max_poll_records=5
来配合你的问题。使用者只需打印出消耗的每条消息的偏移量和分区:回答您的问题的结果是,使用者在移动到另一个分区之前尝试从一个分区获取尽可能多的数据。仅在所有来自分区的消息
1
已消耗,但未达到最大轮询记录数5的限制,但它又从分区添加了两条消息2
.以下是一些指纹,以便更好地理解。