我有一个Kafka主题,有两个分区。我已经创建了一个消息处理程序,有两个不同的键。
@Bean
@ServiceActivator(inputChannel = "pushDataRequestChannel")
public MessageHandler pushDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKey()));
return handler;
}
@Bean
public DirectChannel processDataRequestChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "processDataRequestChannel")
public MessageHandler processDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKeyOne()));
return handler;
}
1.组id
1.分区键
所以,我将向分区0发送消息,只针对奇数。对于事件号,它应该转到分区1。
这是我的逻辑
for(int i=0; i < 6 ; i++)
{
Predicate<Integer> isEven = n -> n % 2 == 0;
// Check if the number is even or odd and display the result
if (isEven.test(i)) {
System.out.println(i + " is an even number.");
MessageNeedToSend message = new MessageNeedToSend();
----
---
---
pushDataRequestChannel.send(MessageBuilder
.withPayload(objectMapper.writeValueAsString(message))
.copyHeaders(headers)
.build());
} else {
MessageNeedToSend message = new MessageNeedToSend();
----
---
---
processDataRequestChannel.send(MessageBuilder
.withPayload(objectMapper.writeValueAsString(message))
.copyHeaders(headers)
.build());
System.out.println(i + " is an odd number.");
}
}
但它总是去分区0。
partition_1_key -706172746974696 F6 E5 F315 F6 B6579(byte array)-- 1550936367(murmur hash 2)-1550936367% 2 = 1 -(根据我的理解,应该是分区1)
group_id -67726 F75705 F6964(byte array)-- 2186850892(murmur hash 2)-2186850892% 2 = 0 -(据我所知应该是分区0)
但它总是去分区0;
你能告诉我
2条答案
按热度按时间rkue9o1l1#
分区程序逻辑已更改。当计算一个批时,所有记录都粘在同一个分区上,而不是根据每个键分布在所有记录中
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
o2g1uqev2#
ProducerRecord允许您指定分区号,而不是使用两个处理程序。使用spring Boot boot KafkaProducerMessageLog,您可以通过使用
setPartitionIdExpression
指定partition id
而不是在计算分区ID时使用key来发送消息到topic。谢谢