我正在尝试将具有不同密钥的消息存储到不同的分区。
例如:
ProducerRecord<String, String> rec1 = new ProducerRecord<String, String>("topic", "key1", line);
ProducerRecord<String, String> rec2 = new ProducerRecord<String, String>("topic", "key2", line);
producer.send(rec1);
producer.send(rec2);
但当我尝试运行producer类时,它总是存储在单个分区中。
根据文件, DefaultPartitioner
使用 message key hash code
找到分区。我也看到这个问题Kafka分区键不能正常工作, 但我找不到 ByteArrayPartitioner
在kafka客户端库的0.9.x版本中初始化。
props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
更新:我正在使用代码动态创建主题。
如果我手动创建一个带有分区的主题,那么它可以正常工作。
1条答案
按热度按时间cbwuti441#
如果主题是“动态”创建的,则根据
num.partitions
参数(默认值)1
). 如果只有一个分区,那么所有的数据都将进入这个分区。但是,请记住,即使您有多个分区,一个分区仍然可以分配不同的密钥!即使您有num partitions==num distinct keys,也可能会发生哈希冲突,将两个不同的键分配给相同的分区(并将一些分区留空)。
如果您想确保不同的键总是指向不同的分区,则需要使用使用者分区器或直接指定分区号。