我在我的第二个项目中使用了与第一个项目相同的Kafka主题名称,但消息有不同的包,并且在第一个项目中我得到了异常“类‘com.example.proj2’不在受信任的包中”。我不希望这条信息出现在第一个项目中。有没有可能跳过这条信息?我试图捕获SerializationException并提交,但没有帮助。信息一次又一次地传给消费者。
消费者配置:
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
configurations.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "true");
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, groupId );
configurations.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId );
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
configurations.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
configurations.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "52428800");
configurations.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3600000");
configurations.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
configurations.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.proj1");
configurations.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
consumer = new KafkaConsumer<Object, Object>(configurations);
consumer.subscribe(topic);
消费代码:
try {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(10000));
if(records.count() > 0) {
handleMessages(records);
consumer.commitSync();
}
} catch(SerializationException e) {
LOGGER.error("Invalid package", e);
consumer.commitSync();
} catch (Exception e) {...}
1条答案
按热度按时间bis0qfac1#
DefaultJackson2JavaTypeMapper
(在JsonDeserializer
中使用)执行以下操作:您可以考虑使用
ErrorHandlingDeserializer
Package 器来处理这种类型的错误:https://docs.spring.io/spring-kafka/reference/html/#error-handling-deserializer