跳过Kafka消费者中不受信任的套餐

hgqdbh6s  于 2022-09-21  发布在  Apache
关注(0)|答案(1)|浏览(130)

我在我的第二个项目中使用了与第一个项目相同的Kafka主题名称,但消息有不同的包,并且在第一个项目中我得到了异常“类‘com.example.proj2’不在受信任的包中”。我不希望这条信息出现在第一个项目中。有没有可能跳过这条信息?我试图捕获SerializationException并提交,但没有帮助。信息一次又一次地传给消费者。

消费者配置:

Map<String, Object> configurations = new HashMap<>();
    configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    configurations.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "true");
    configurations.put(ConsumerConfig.GROUP_ID_CONFIG, groupId );
    configurations.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId );
    configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    configurations.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
    configurations.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "52428800");
    configurations.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3600000");
    configurations.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
    configurations.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.proj1");
    configurations.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
    consumer = new KafkaConsumer<Object, Object>(configurations);
    consumer.subscribe(topic);

消费代码:

try {
        ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(10000));
        if(records.count() > 0) {
            handleMessages(records);
            consumer.commitSync();
        }
    } catch(SerializationException e) {
        LOGGER.error("Invalid package", e);
        consumer.commitSync();
    } catch (Exception e) {...}
bis0qfac

bis0qfac1#

DefaultJackson2JavaTypeMapper(在JsonDeserializer中使用)执行以下操作:

if (!isTrustedPackage(classId)) {
                throw new IllegalArgumentException("The class '" + classId
                        + "' is not in the trusted packages: "
                        + this.trustedPackages + ". "
                        + "If you believe this class is safe to deserialize, please provide its name. "
                        + "If the serialization is only done by a trusted source, you can also enable "
                        + "trust all (*).");

您可以考虑使用ErrorHandlingDeserializer Package 器来处理这种类型的错误:https://docs.spring.io/spring-kafka/reference/html/#error-handling-deserializer

相关问题