在库更新之前的spring boot/kafka应用程序中,我使用了以下类 org.telegram.telegrambots.api.objects.Update
为了给Kafka主题留言。现在我用的是 org.telegram.telegrambots.meta.api.objects.Update
. 如你所见,他们有不同的 Package 。
重新启动应用程序后,我遇到以下问题:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
这是我的配置:
@EnableAsync
@Configuration
public class ApplicationConfig {
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Update> updateProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Update> updateKafkaTemplate() {
return new KafkaTemplate<>(updateProducerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.max.poll.interval.ms}")
private String kafkaConsumerMaxPollIntervalMs;
@Value("${kafka.consumer.max.poll.records}")
private String kafkaConsumerMaxPollRecords;
@Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
private Integer updateConsumerConcurrency;
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory(kafkaProperties));
return factory;
}
@Bean
public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
factory.setConcurrency(updateConsumerConcurrency);
return factory;
}
}
应用程序属性
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
如何解决这个问题,让Kafka将旧的信息反序列化为新的信息?
更新
这是我的听众
@Component
public class UpdateConsumer {
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
//do some logic here
ack.acknowledge();
}
}
7条答案
按热度按时间xesrikrc1#
我也面临过这个问题,但是上面的解决方案对我不起作用。不过,关键在于将Kafka消费品工厂配置如下:
sd2nnvve2#
请参阅文档。
从版本2.1开始,类型信息可以在记录头中传递,允许处理多种类型。此外,可以使用kafka属性配置序列化程序/反序列化程序。
jsonserializer.add\ type\ info\ headers(默认为true);设置为false可禁用jsonserializer上的此功能(设置addtypeinfo属性)。
jsondeserializer.key\默认\类型;如果不存在标头信息,则用于键反序列化的回退类型。
jsondeserializer.value\默认\类型;如果不存在标头信息,用于反序列化值的回退类型。
jsondeserializer.trusted\u包(默认java.util、java.lang);允许反序列化的包模式的逗号分隔列表;*意味着反序列化所有。
默认情况下,序列化程序将向标头添加类型信息。
请参阅引导文档。
类似地,您可以禁用jsonserializer在标头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
或者可以将类型Map添加到入站消息转换器,以将源类型Map到目标类型。编辑
说到这里,你用的是什么版本?
eulz3vhy3#
为spring-kafka-2.2.8解决了我的问题。
添加到
application.properties
:重要提示:
如果您分别为kafkaconsumer和kafkaproducer提供了序列化程序和反序列化程序示例,则它们没有任何效果。
参考文献:
[1] https://docs.spring.io/spring-kafka/reference/html/#json-塞德
[2] https://github.com/spring-projects/spring-kafka/issues/535
kkih6yb84#
生产者配置类
注意:容器是要在kafka主题中发布的自定义对象。
生产者阶级
生产商控制器
注意:container类型的消息将作为json消息发布到kafka topic name:final topic。
--消费者应用程序--
配置类
注意:在这里您可以看到,我们必须使用自定义的jsondeserializer来使用来自最终主题(主题名称)的容器对象类型json消息,而不是使用默认的jsondeserializer()。
消费者服务
f4t66c6m5#
对于这一个,有两种方法,要么在反序列化程序中,要么在application.yml中。
在反序列化程序中
在反序列化程序中,在
DefaultKafkaConsumerFactory
(创建消费工厂)。假设你想做一个ConsumerFactory<String, Foo>
与Foo
成为Kafka信息中的模特/pojo。你需要
addTrustedPackages
从JsonDeserializer
我在kotlin中有一个例子,但在java中几乎是相同的语法:或者在你的应用程序中
在application.yml文件中,遵循spring kafka指令。我们将com.example.entity.foo包中的foo类添加到受信任的存储中,使用:
与
spring.json.trusted.packages
接受包数组。可以指定类包,也可以使用*
任何包裹。那样的话你就不用通过考试了deserializer
在DefaultKafkaConsumerFactory()
仅在使用者配置中。o4tp2gmn6#
应该提到两个关键点。
有两个独立的项目为生产者和消费者。
然后发送消息(值)是一种对象类型,而不是原始类型。
问题是产生消息对象在使用者端不可用,因为这是两个独立的项目。
两个克服这个问题请遵循下面提到的步骤在 Spring 启动生产者和消费者的应用程序。
----生产商应用程序
y1aodyip7#
我的SpringKafka版本是2.2.11,我也有这个错误。
我得到这个错误是因为我在同一个kafta主题中用不同的配置配置了两个消费者。其中一个有consumerfactory<string,ordedto>,另一个有consumerfactory<string,string>。
我解决了更改一个消费者配置的错误,因为这是错误的。
只需检查主题的消费者