我在我的一个项目中使用Kafka connect实现了流。我创建了S3 sink连接器来使用来自不同主题的消息(使用regex),然后将一个文件写入S3。使用来自不同主题的消息是使用下面的属性完成的。"topics.regex": "my-topic-sample\\.(.+)",
我有3个不同的主题如下。使用上述属性,S3接收连接器将消费来自这3个主题的消息,并写一个单独的文件(为每个主题)到S3。
my-topic-sample.test1
my-topic-sample.test2
my-topic-sample.test3
目前,忽略所有无效消息。但是,希望实现死信队列。
我们可以使用以下属性来实现
'errors.tolerance'='all',
'errors.deadletterqueue.topic.name' = 'error_topic'
通过上述属性,我们可以将所有无效消息移动到DLQ。但问题是,尽管有3个不同的主题,S3接收器连接器正在使用其中的消息,但我们将只有1个DLQ。来自所有3个主题的无效消息将被推送到同一个DLQ。
是否有一种方法可以让我们拥有多个DLQ,并根据从中使用的主题将消息写入不同的DLQ。
谢谢
1条答案
按热度按时间svgewumm1#
您只能为每个连接器配置一个DLQ主题,不能为每个源主题使用不同的DLQ主题。
现在,如果要将连接器无法处理的记录拆分到不同的DLQ主题中,则需要启动多个连接器示例,每个连接器示例使用不同的源主题。
Apache Kafka是一个开源项目,所以如果您准备好迎接挑战,您可以建议实现这样一个特性,如果您愿意的话。