无法使用deadletterpublishingrecoverer创建kafka deallettertopic

68bkxrlz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(903)

我正在尝试使用deadletterpublishingrecoverer创建死信主题,但无法创建它。下面是我正在定义的bean。
但是我在日志中看到,在producerconfig中,我没有看到属性allow.auto.create.topics。它丢失了,因此无法创建dlt主题。
有人能建议一下吗?

@Autowired
    private KafkaProperties kafkaProperties;

@Bean
    public ProducerFactory<String, Object> produceFactory() {
        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(
                configProps);
        return defaultKafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemp() {
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
    }

    @Bean
    public KafkaOperations<String, Object> getKafkaTemplate() { // producer to DLQ
        return kafkaTemp();
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
        return new DeadLetterPublishingRecoverer(getKafkaTemplate());
    }

    @Bean
    public SeekToCurrentErrorHandler seekToCurrentErrorHandler(
            DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, OfferRESTClient offerRestClient) {
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
                deadLetterPublishingRecoverer.accept(record, exception);
            }
        });
        errorHandler.setCommitRecovered(true);
        errorHandler.setBackOffFunction((record, exception) -> {
            return new FixedBackOff(0L, 5L;
        });
        return errorHandler;
    }
fjaof16o

fjaof16o1#

allow.auto.create.topics 是经纪人的财产,不是客户的财产。您通常不想使用它,因为它可能无法获得所需数量的分区。
框架不会自动提供死信主题,您必须自己创建它们,或者添加一个
NewTopic @Bean Spring会为你创造它。
请参阅配置主题。
另请参见发布死信记录。
默认情况下,死信记录被发送到名为 <originalTopic>.DLT (原始主题名后缀为.dlt)并与原始记录位于同一分区。因此,在使用默认解析器时,死信主题必须至少具有与原始主题相同的分区。

相关问题