我将(connection.backoff.ms,retry.backoff.ms)参数设置为5分钟,并期望在从Kafka向断开连接的数据库发送消息后,重试时间为5分钟,但在重新启动数据库后,来自kafka的消息立即到达,而不是在5分钟后到达,可能是其他参数造成的。
我正在使用此Docker图像https://github.com/confluentinc/demo-scene/tree/master/kafka-to-database
这是我的连接器配置:
curl -X PUT http://localhost:8083/connectors/jdbc_postgres/config \
-H "Content-Type: application/json" -d '{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://postgres:5432/postgres",
"topics":"test01",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"connection.user":"postgres",
"connection.password":"postgres",
"auto.create":true,
"auto.evolve":true,
"insert.mode":"upsert",
"pk.mode":"record_key",
"pk.fields":"MESSAGE_KEY",
"transforms":"Filter",
"transforms.Filter.type":"org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate":"DropNull",
"predicates":"DropNull",
"predicates.DropNull.type":"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"errors.log.enable":"true",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dead_test01",
"errors.deadletterqueue.topic.replication.factor":"-1",
"connection.backoff.ms":"300000"
}'
2条答案
按热度按时间vsikbqxv1#
您需要使用
consumer.override.
前缀来为使用者设置这两个值中的任何一个,但这只是到代理的连接,而不是数据库。不能在Connect API中延迟Kafka使用者轮询循环。
hec6srdp2#
这里
connection.backoff.ms
和retry.backoff.ms
之间存在混淆。connection.backoff.ms
是在通信失败的情况下应该用来控制与db的时间延迟的函数。而
retry.backoff.ms
只是控制在任何处理失败的情况下在连接器内重试任务之前的延迟。值得一提的是,如果您启动了连接器,并且数据库链接断开,那么当您检查连接器(
curl localhost:8083/connector/connector_name/status
)的状态时,您将看到任务处于RUNNING
状态,这意味着retry.backoff.ms
无论如何都不会开始运行。回到
connection.backoff.ms
,假设以下事件序列:如果您在
****
期间的任何时候启动DB,则连接器不应采取任何操作,直到下一次重试时间到来,如果您看到此类行为,请确保您没有在重试操作的时间启动DB。