spring cloud kinesis binder如何为生产者处理错误-根据文档,它不起作用

6yt4nkrj  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(344)

我遵循下面的文档,我有一个生产者和消费者的工作非常好的动势流。我想了解在发生任何异常时如何处理producer(source)中的错误。
根据spring stream错误处理文档,我尝试了以下方法:

@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")

两者都不起作用。我在producer方法中显式抛出一个runtimeexception,并期望它位于“errorchannel”中,但无法接收相同的结果。
请帮助我弄清楚这一点或与我分享它的方法,如果有人成功地做到了这一点。

mitkmikd

mitkmikd1#

注意:我几个月前就遇到了这个问题。我们用Kafka代替动觉。既然你已经为spring cloud stream binder kafka贴上了标签,我就在同一个地方提供输入。
您可以为生产者编写自定义回拨,此回拨可以告诉您消息是否已失败或成功发布。失败时,记录消息的元数据。
下面是一个小的代码片段,不能更好地解释我所说的回调。
请注意,这是给Kafka的。相应地更改运动的代码。

public class ProduceToKafka {

  private ProducerRecord<String, String> message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer<String, String> myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord<String, String> message = new ProducerRecord<>(
        "topicName", string);

    myProducer.send(message, new MyCallback(message.key(), message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key, String value) {
      this.key = key;
      this.value = value;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info("" + metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}

我写了一篇关于在spring抽象的不同层次上处理生产者失败的文章。这对你有帮助。过来看:https://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c生产商故障处理

相关问题