在KafkaScript上定义自定义值解析器

83qze16e  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(101)

在我的主application.properties中,我有:

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

字符串
这一切都很好,并且值被转换为String,正如预期的那样。
现在,在我的集成测试中,我想引入另一个KafkaListener(也就是说,第二个侦听器,我不想覆盖我的主应用程序中的侦听器的行为!),但这次是另一个值转换器(字节数组)。这是否可能,而不必为这个侦听器引入自定义ListenerContainerFactory
我尝试了以下方法,但没有成功:

@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}


我得到:

java.lang.ClassCastException: java.lang.String cannot be cast to [B


这意味着集成测试中在KafkaListener上定义的ad-hoc值验证器没有被使用。
我现在用的是spring-kafka 2.5.8。

vyswwuz2

vyswwuz21#

properties需要是字符串列表,并且可以引用常量

@KafkaListener(topics = ..., properties = {
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + "=org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}

字符串
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties
这将正确地使用value.deserializer作为键(点,而不是连字符)
或者在集成测试中,您可以加载不同的属性文件。

相关问题