在我的主application.properties
中,我有:
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
字符串
这一切都很好,并且值被转换为String
,正如预期的那样。
现在,在我的集成测试中,我想引入另一个KafkaListener
(也就是说,第二个侦听器,我不想覆盖我的主应用程序中的侦听器的行为!),但这次是另一个值转换器(字节数组)。这是否可能,而不必为这个侦听器引入自定义ListenerContainerFactory
?
我尝试了以下方法,但没有成功:
@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
...
}
型
我得到:
java.lang.ClassCastException: java.lang.String cannot be cast to [B
型
这意味着集成测试中在KafkaListener
上定义的ad-hoc值验证器没有被使用。
我现在用的是spring-kafka 2.5.8。
1条答案
按热度按时间vyswwuz21#
properties
需要是字符串列表,并且可以引用常量字符串
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties
这将正确地使用
value.deserializer
作为键(点,而不是连字符)或者在集成测试中,您可以加载不同的属性文件。