我正在尝试使用deadletterpublishingrecoverer创建死信主题,但无法创建它。下面是我正在定义的bean。
但是我在日志中看到,在producerconfig中,我没有看到属性allow.auto.create.topics。它丢失了,因此无法创建dlt主题。
有人能建议一下吗?
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ProducerFactory<String, Object> produceFactory() {
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(
configProps);
return defaultKafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemp() {
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public KafkaOperations<String, Object> getKafkaTemplate() { // producer to DLQ
return kafkaTemp();
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getKafkaTemplate());
}
@Bean
public SeekToCurrentErrorHandler seekToCurrentErrorHandler(
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, OfferRESTClient offerRestClient) {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
deadLetterPublishingRecoverer.accept(record, exception);
}
});
errorHandler.setCommitRecovered(true);
errorHandler.setBackOffFunction((record, exception) -> {
return new FixedBackOff(0L, 5L;
});
return errorHandler;
}
1条答案
按热度按时间fjaof16o1#
allow.auto.create.topics
是经纪人的财产,不是客户的财产。您通常不想使用它,因为它可能无法获得所需数量的分区。框架不会自动提供死信主题,您必须自己创建它们,或者添加一个
NewTopic
@Bean
Spring会为你创造它。请参阅配置主题。
另请参见发布死信记录。
默认情况下,死信记录被发送到名为
<originalTopic>.DLT
(原始主题名后缀为.dlt)并与原始记录位于同一分区。因此,在使用默认解析器时,死信主题必须至少具有与原始主题相同的分区。