Spring的云流没有设置Kafka关键的消息?

rsl1atfo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(306)

故事是这样的。我有一个kafka代理和一个特定的对象(我只想通过我的主题发送这个对象),它有一个我想用作键的id。
目前我正在使用'partitionkeyextractorclass'配置来设置提取id并将其作为键返回的类。
看起来是这样的:

def extractKey(Message<?> message) {
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = ${id}")

    return id
}

我的实际问题是,当我浏览主题上的消息时,保存我消息的consumerrecord会说密钥为空。。。
这是虫子吗?我做错什么了吗?关于这一点的文件并没有比这更进一步。

pepwfjgg

pepwfjgg1#

看,你在混 partitionkey .
目前 KafkaMessageChannelBinder 不提供用于确定 key 反对 Message .
只有现有的功能,你可以使用强有力的是 KafkaHeaders.MESSAGE_KEY :

Object messageKey = this.messageKeyExpression != null
            ? this.messageKeyExpression.getValue(this.evaluationContext, message)
            : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);

所以,在 output 消息您应该计算密钥并将其放入该标头中。

相关问题