如何处理unkownproduceridexception

wbrvyc0a  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(391)

我们在使用springcloud和kafka时遇到了一些问题,有时我们的微服务会抛出一个错误 UnkownProducerIdException ,这是由于参数 transactional.id.expiration.ms 在代理端过期。
我的问题是,是否可以捕获该异常并重试失败的消息?如果是的话,处理它的最佳选择是什么?
我看了一下:

  • https://cwiki.apache.org/confluence/pages/viewpage.action?pageid=89068820
    -Kafka未知\u生产者\u id异常
    我们用的是Spring Cloud Hoxton.RELEASE Kafka版本和 Spring 版 2.2.4.RELEASE 我们使用的是aws-kafka解决方案,因此不能对我前面提到的属性设置新值。
    以下是一些例外情况:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563  INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:

要重现此异常:
-我使用了汇合docker图像并设置了环境变量 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS 到10秒,这样我就不会等待太多的时间来抛出这个异常。
-在另一个进程中,以10秒的间隔逐个发送java将要侦听的主题中的1条消息。
下面是一个代码示例:
文件绑定.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Bindings {
  @Input("test-input")
  SubscribableChannel testListener();

  @Output("test-output")
  MessageChannel testProducer();
}

文件application.yml(不要忘记设置环境变量 KAFKA_HOST ):

spring:
  cloud:
    stream:
      kafka:
        binder:
          auto-create-topics: true
          brokers: ${KAFKA_HOST}
          transaction:
            producer:
              error-channel-enabled: true
          producer-properties:
            acks: all
            retry.backoff.ms: 200
            linger.ms: 100
            max.in.flight.requests.per.connection: 1
            enable.idempotence: true
            retries: 3
            compression.type: snappy
            request.timeout.ms: 5000
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            session.timeout.ms: 20000
            max.poll.interval.ms: 350000
            enable.auto.commit: true
            allow.auto.create.topics: true
            auto.commit.interval.ms: 12000
            max.poll.records: 5
            isolation.level: read_committed
          configuration:
            auto.offset.reset: latest

      bindings:

        test-input:
          # contentType: text/plain
          destination: test.produce
          group: group-input
          consumer:
            maxAttempts: 3
            startOffset: latest
            autoCommitOnError: true
            queueBufferingMaxMessages: 100000
            autoCommitOffset: true

        test-output:
          # contentType: text/plain
          destination: test.produce.another
          group: group-output
          producer:
            acks: all

debug: true

侦听器处理程序:

@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {

    private static final Logger log = LoggerFactory.getLogger(PocApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(PocApplication.class, args);
    }

    @Autowired
    private BinderAwareChannelResolver binderAwareChannelResolver;

    @StreamListener(Topics.TESTLISTENINPUT)
    public void listen(Message<?> in, String headerKey) {
        final MessageBuilder builder;
        MessageChannel messageChannel;

        messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");

        Object payload = in.getPayload();
        builder = MessageBuilder.withPayload(payload);

        try {
            log.info("Event received: {}", in);

            if (!messageChannel.send(builder.build())) {
                log.error("Something happend trying send the message! {}", in.getPayload());
            }

            log.info("Commit success");
        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);
        } catch (KafkaException e) {
            log.error("KafkaException catched ", e);
        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }
    }
}

当做

enyaitl3

enyaitl31#

} catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);

要捕获异常,需要设置 sync Kafka生产者财产(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.release/reference/html/spring-cloud-stream-binder-kafka.html#kafka-生产商财产)。否则,错误将异步返回
你不应该“吃”那里的例外;它必须被抛出回容器,以便容器回滚事务。
也,

}catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }

提交是在流侦听器返回到容器之后由容器执行的,因此在这里您永远不会看到提交错误;同样,必须让异常传播回容器。
容器将根据使用者绑定的重试配置重试传递。

相关问题