Kafka 如何在JdbcSinkConnector中实现延迟?

but5z9lq  于 2023-03-07  发布在  Apache
关注(0)|答案(2)|浏览(128)

我将(connection.backoff.ms,retry.backoff.ms)参数设置为5分钟,并期望在从Kafka向断开连接的数据库发送消息后,重试时间为5分钟,但在重新启动数据库后,来自kafka的消息立即到达,而不是在5分钟后到达,可能是其他参数造成的。
我正在使用此Docker图像https://github.com/confluentinc/demo-scene/tree/master/kafka-to-database
这是我的连接器配置:

curl -X PUT http://localhost:8083/connectors/jdbc_postgres/config \
     -H "Content-Type: application/json" -d '{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://postgres:5432/postgres",
"topics":"test01",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"connection.user":"postgres",
"connection.password":"postgres",
"auto.create":true,
"auto.evolve":true,
"insert.mode":"upsert",
"pk.mode":"record_key",
"pk.fields":"MESSAGE_KEY",
"transforms":"Filter",
"transforms.Filter.type":"org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate":"DropNull",
"predicates":"DropNull",
"predicates.DropNull.type":"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"errors.log.enable":"true",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dead_test01",
"errors.deadletterqueue.topic.replication.factor":"-1",
"connection.backoff.ms":"300000"
}'
vsikbqxv

vsikbqxv1#

您需要使用consumer.override.前缀来为使用者设置这两个值中的任何一个,但这只是到代理的连接,而不是数据库。
不能在Connect API中延迟Kafka使用者轮询循环。

hec6srdp

hec6srdp2#

这里connection.backoff.msretry.backoff.ms之间存在混淆。
connection.backoff.ms是在通信失败的情况下应该用来控制与db的时间延迟的函数。
retry.backoff.ms只是控制在任何处理失败的情况下在连接器内重试任务之前的延迟。
值得一提的是,如果您启动了连接器,并且数据库链接断开,那么当您检查连接器(curl localhost:8083/connector/connector_name/status)的状态时,您将看到任务处于RUNNING状态,这意味着retry.backoff.ms无论如何都不会开始运行。
回到connection.backoff.ms,假设以下事件序列:

DB up              connector up         DB Down  
   |----------------------|-----------------|******************************|--------
                                            |------------------------------|
                                        connector                      Next retry time
                                        will fail
                                        (here, you should see in the logs, something like 
                            "Unable to connect to database on attempt 1/3. Will retry in X ms." X=connection.backoff.ms)

如果您在****期间的任何时候启动DB,则连接器不应采取任何操作,直到下一次重试时间到来,如果您看到此类行为,请确保您没有在重试操作的时间启动DB。

相关问题