故事是这样的。我有一个kafka代理和一个特定的对象(我只想通过我的主题发送这个对象),它有一个我想用作键的id。
目前我正在使用'partitionkeyextractorclass'配置来设置提取id并将其作为键返回的类。
看起来是这样的:
def extractKey(Message<?> message) {
log.info('Extracting key from message')
String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
log.info("Got = ${id}")
return id
}
我的实际问题是,当我浏览主题上的消息时,保存我消息的consumerrecord会说密钥为空。。。
这是虫子吗?我做错什么了吗?关于这一点的文件并没有比这更进一步。
1条答案
按热度按时间pepwfjgg1#
看,你在混
partition
与key
.目前
KafkaMessageChannelBinder
不提供用于确定key
反对Message
.只有现有的功能,你可以使用强有力的是
KafkaHeaders.MESSAGE_KEY
:所以,在
output
消息您应该计算密钥并将其放入该标头中。