spring kafka制作人未发送到kafka 1.0.0(magic v1不支持记录头)

0mkxixxg  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(725)

我使用docker compose设置在本地设置kafka:https://github.com/wurstmeister/kafka-docker/ docker-compose up 很好,通过shell创建主题也很好。
现在我试着通过 spring-kafka:2.1.0.RELEASE 启动spring应用程序时,它会打印正确的kafka版本:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

我试着发出这样的信息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

客户端发送失败

UnknownServerException: The server experienced an unexpected error when processing the request

在服务器控制台中,我得到消息magicv1不支持记录头

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

google暗示了版本冲突,但版本似乎很合适( org.apache.kafka:kafka-clients:1.0.0 在类路径中)。
有什么线索吗?谢谢!
编辑:我缩小了问题的范围。发送普通字符串是可行的,但是通过jsonserializer发送json会导致给定的问题。以下是我的生产者配置的内容:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())
6uxekuva

6uxekuva1#

我也有类似的问题。如果我们使用 JsonSerializer 或者 JsonSerde 对于值。为了防止这个问题,我们需要禁用添加信息头。
如果您对默认的json序列化还满意,那么可以使用以下方法(这里的关键点是 ADD_TYPE_INFO_HEADERS ):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

但如果你需要定制 JsonSerializer 特定的 ObjectMapper (就像 PropertyNamingStrategy.SNAKE_CASE ),则应禁用在上显式添加信息头 JsonSerializer ,就像Kafka忽视的那样 DefaultKafkaProducerFactory 的财产 ADD_TYPE_INFO_HEADERS (对我来说,这是一个糟糕的设计SpringKafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

或者如果我们使用 JsonSerde ,然后:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
7uhlpewt

7uhlpewt2#

解决了的。问题既不是代理,也不是docker缓存,也不是spring应用程序。
问题是一个控制台使用者,我并行使用它进行调试。这是一个“老”消费者开始 kafka-console-consumer.sh --topic=topic --zookeeper=... 它实际上在启动时打印警告: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. “新”消费者 --bootstrap-server 应该使用选项(特别是在使用kafka 1.0和jsonserializer时)。注意:在这里使用旧的消费者确实会影响生产商。

e5nszbig

e5nszbig3#

您使用的是kafka版本<=0.10.x.x一旦您使用它,您必须将jsonserializer.add\u type\u info\u headers设置为false,如下所示。

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

为您的生产商工厂财产。
如果您使用的是kafka版本>0.10.x.x,它应该可以正常工作

mzaanser

mzaanser4#

我只是对docker的图片做了个测试没有问题。。。

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

编辑
对我来说仍然有效。。。

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

相关问题