通过KafkaIO发送到Kafka时如何正确捕获异常?
KafkaIO.<String, String>write()
.withBootstrapServers(kafkaBroker)
.withTopic(topic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
.withProducerConfigUpdates(kafkaProperties)
我有TupleCollection和Failure对象来存储异常,但是如果没有发送消息,我需要重试发送消息(或者将此消息写入任何其他源- db,文件等)。我只需要捕获错误,但如何使用KafkaIO.<String,String>write()来实现呢?
1条答案
按热度按时间dkqlctbz1#
目前,在Beam中通过KafkaIO写入Kafka时,无法捕获抛出的异常。我目前正在设计支持此功能,但它不会在短期内可用。
Dataflow runner将自动重试失败的消息(在批处理模式下重试4次,在流模式下不限次数)。(https://cloud.google.com/dataflow/docs/guides/common-errors)