我使用docker compose设置在本地设置kafka:https://github.com/wurstmeister/kafka-docker/ docker-compose up
很好,通过shell创建主题也很好。
现在我试着通过 spring-kafka:2.1.0.RELEASE
启动spring应用程序时,它会打印正确的kafka版本:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
我试着发出这样的信息
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
客户端发送失败
UnknownServerException: The server experienced an unexpected error when processing the request
在服务器控制台中,我得到消息magicv1不支持记录头
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
google暗示了版本冲突,但版本似乎很合适( org.apache.kafka:kafka-clients:1.0.0
在类路径中)。
有什么线索吗?谢谢!
编辑:我缩小了问题的范围。发送普通字符串是可行的,但是通过jsonserializer发送json会导致给定的问题。以下是我的生产者配置的内容:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())
4条答案
按热度按时间6uxekuva1#
我也有类似的问题。如果我们使用
JsonSerializer
或者JsonSerde
对于值。为了防止这个问题,我们需要禁用添加信息头。如果您对默认的json序列化还满意,那么可以使用以下方法(这里的关键点是
ADD_TYPE_INFO_HEADERS
):但如果你需要定制
JsonSerializer
特定的ObjectMapper
(就像PropertyNamingStrategy.SNAKE_CASE
),则应禁用在上显式添加信息头JsonSerializer
,就像Kafka忽视的那样DefaultKafkaProducerFactory
的财产ADD_TYPE_INFO_HEADERS
(对我来说,这是一个糟糕的设计SpringKafka)或者如果我们使用
JsonSerde
,然后:7uhlpewt2#
解决了的。问题既不是代理,也不是docker缓存,也不是spring应用程序。
问题是一个控制台使用者,我并行使用它进行调试。这是一个“老”消费者开始
kafka-console-consumer.sh --topic=topic --zookeeper=...
它实际上在启动时打印警告:Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
“新”消费者--bootstrap-server
应该使用选项(特别是在使用kafka 1.0和jsonserializer时)。注意:在这里使用旧的消费者确实会影响生产商。e5nszbig3#
您使用的是kafka版本<=0.10.x.x一旦您使用它,您必须将jsonserializer.add\u type\u info\u headers设置为false,如下所示。
为您的生产商工厂财产。
如果您使用的是kafka版本>0.10.x.x,它应该可以正常工作
mzaanser4#
我只是对docker的图片做了个测试没有问题。。。
.
.
.
编辑
对我来说仍然有效。。。
.