spring云流中基于kafka头的条件(基于内容)路由

qc6wkl3g  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我在springboot2.x应用程序中使用springcloudstream3.x来使用来自kafka主题的消息。
我希望有一个侦听器,根据doc:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

但是,侦听器从未收到通知,如果条件被删除,我会在日志中看到以下内容:

Received: ... SomeHeader: [B@1055e4af ...

结果是,自定义头被保留为kafka字节数组原始格式,使得它们不符合条件求值的条件。
是需要一些额外的配置还是我遗漏了什么?

64jmpszr

64jmpszr1#

在对来源和stackoveflow进行了一些挖掘之后,我发现了以下内容:
spring云流委托到spring kafka消息和头转换(kafkamessagechannelbinder~getheadermapper)
默认的头转换实现(binderheadermapper)将头保留为原始格式
springcloudstream允许定制头Map,特别是将头从字节数组转换为字符串(在springcloudstream项目中,如何将传入头Map为字符串而不是byte[])
所以我添加了我的自定义头Map器bean(bean名称很重要,它允许省略其他配置属性),它将我的自定义头Map到字符串:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

解决了问题:

Received: ... SomeHeader: SomeHeaderValue ...

p、 它看起来像是Spring云流中的一只虫子:
它引入了自己的头Map器(binderheadermapper)的实现,但后者不考虑条件路由特性。
头Map器是kafkamessagechannelbinder中的子类,如果提供了自定义头Map器,那么这个添加的行为是不明显的,并且将丢失。

相关问题