Kafka消费者错误

wixjitnu  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(892)

我正在使用Kafka产品和SpringKafka消费者。我使用的是json序列化程序和反序列化程序。每当我试图从主题中读取消费者中的消息时,就会出现以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

我没有在producer和consumer中配置任何关于头的信息。我错过了什么?

yyyllmsg

yyyllmsg1#

我相信你错过了一个事实 JsonDeserializer 必须在 ConsumerFactory 使用适当的默认类型进行反序列化,但不在kafka属性中。
所有信息都显示在文档中:https://docs.spring.io/spring-kafka/docs/2.1.7.release/reference/html/_reference.html#serdes

lb3vh1jj

lb3vh1jj2#

再加上上面的答案,
下面的修改为我解决了。

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

添加

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));

而不是

return new DefaultKafkaConsumerFactory<String, String>(config);

供参考,
下面的方法 deserialize 需要标头和“ Assert.state.. “扔 IllegalStateException ```
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}

相关问题