spring云流发送到kafka的错误控制处理

cld4siwp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(287)

如何抓住写Kafka主题的错误?

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.xml.bind.JAXBException;

@Component
@EnableBinding(Processor.class)
public class Parser {

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String process(Message<?> input) {

        // do smth

        // how to catch an error if sending message to 'output' topic fails
    }

}

将生产者切换到同步模式。

spring.cloud.stream.kafka.bindings.output.producer.sync=true

接下来呢?有什么例子吗?扩展一些绑定impl,添加一些aop魔法?

p5fdfcr1

p5fdfcr11#

我想你需要的是注册一个 KafkaProducerListener 它在您的案例中处理错误场景,并使其在应用程序中作为bean可用。
更多关于Kafka的信息,请点击这里
另外,请注意,通常失败的消息在 errorChannel 使用频道名称 error . 一旦配置了,所有错误消息都将发布 destination 错误通道的名称: spring.cloud.stream.bindings.error.destination .

相关问题