所有生成的Kafka消息总是发送到分区0,即使密钥哈希值为1

lyr7nygr  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(131)

我有一个Kafka主题,有两个分区。我已经创建了一个消息处理程序,有两个不同的键。

@Bean
@ServiceActivator(inputChannel = "pushDataRequestChannel")
public MessageHandler pushDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKey()));
    return handler;
}

@Bean
public DirectChannel processDataRequestChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "processDataRequestChannel")
public MessageHandler processDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKeyOne()));
    return handler;
}

1.组id
1.分区键
所以,我将向分区0发送消息,只针对奇数。对于事件号,它应该转到分区1。
这是我的逻辑

for(int i=0; i < 6 ; i++)
        {
            Predicate<Integer> isEven = n -> n % 2 == 0;

            // Check if the number is even or odd and display the result
            if (isEven.test(i)) {
                System.out.println(i + " is an even number.");
                MessageNeedToSend message = new MessageNeedToSend();
                ----
                ---
                ---
                pushDataRequestChannel.send(MessageBuilder
                        .withPayload(objectMapper.writeValueAsString(message))
                        .copyHeaders(headers)
                        .build());
            } else {
                MessageNeedToSend message = new MessageNeedToSend();
                ----
                ---
                ---
                processDataRequestChannel.send(MessageBuilder
                        .withPayload(objectMapper.writeValueAsString(message))
                        .copyHeaders(headers)
                        .build());
                System.out.println(i + " is an odd number.");
            }
        }

但它总是去分区0。
partition_1_key -706172746974696 F6 E5 F315 F6 B6579(byte array)-- 1550936367(murmur hash 2)-1550936367% 2 = 1 -(根据我的理解,应该是分区1)
group_id -67726 F75705 F6964(byte array)-- 2186850892(murmur hash 2)-2186850892% 2 = 0 -(据我所知应该是分区0)
但它总是去分区0;
你能告诉我

rkue9o1l

rkue9o1l1#

分区程序逻辑已更改。当计算一个批时,所有记录都粘在同一个分区上,而不是根据每个键分布在所有记录中
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

o2g1uqev

o2g1uqev2#

ProducerRecord允许您指定分区号,而不是使用两个处理程序。使用spring Boot boot KafkaProducerMessageLog,您可以通过使用setPartitionIdExpression指定partition id而不是在计算分区ID时使用key来发送消息到topic。谢谢

相关问题