写Kafka的Apache Beam错误处理

4sup72z8  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(78)

通过KafkaIO发送到Kafka时如何正确捕获异常?

KafkaIO.<String, String>write()
                  .withBootstrapServers(kafkaBroker)
                  .withTopic(topic)
                  .withKeySerializer(StringSerializer.class)
                  .withValueSerializer(StringSerializer.class)
                  .withProducerConfigUpdates(kafkaProperties)

我有TupleCollection和Failure对象来存储异常,但是如果没有发送消息,我需要重试发送消息(或者将此消息写入任何其他源- db,文件等)。我只需要捕获错误,但如何使用KafkaIO.<String,String>write()来实现呢?

dkqlctbz

dkqlctbz1#

目前,在Beam中通过KafkaIO写入Kafka时,无法捕获抛出的异常。我目前正在设计支持此功能,但它不会在短期内可用。
Dataflow runner将自动重试失败的消息(在批处理模式下重试4次,在流模式下不限次数)。(https://cloud.google.com/dataflow/docs/guides/common-errors

相关问题