数据库关闭时kafka jdbc接收器连接器的最大重试次数和重试间隔

jxct1oxe  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(574)

我试图测试和评估kafka jdbc接收器连接器在数据库关闭时的行为。
当数据库关闭时,在kafka中接收到新消息时,将报告以下错误:

INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:91)
com.microsoft.sqlserver.jdbc.SQLServerException: Unable to access availability database 'Giorgos' because the database replica is not in the PRIMARY or SECONDARY role. Connections to an availability database is permitted only when the database replica is in the PRIMARY or SECONDARY role. Try the operation again later.

重试一次后,将报告以下错误并终止任务:

ERROR WorkerSinkTask{id=sink-giorgos_test-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

在哪里可以修改根据第一个错误设置的失效次数和重试间隔 10000 女士?
假设我想让worker继续尝试连接数据库5分钟。我应该配置哪些参数来执行此操作?
编辑以包含所需文件:
sink-file.properties文件

name=sink-test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=GIORGOS.TOPIC
connection.url=jdbc:sqlserver://ip:port;DatabaseName=Streaming;user=myuser;password=mypass
auto.create=true

# DB failover

max.retries=10
retry.backoff.ms=10000

pk.mode=record_value
pk.fields=ID
insert.mode=upsert
transforms=ExtractField
transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractField.field=data

worker.properties(我在分布式模式下运行时有多个文件)

bootstrap.servers=localhost:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

rest.port=8040
rest.advertised.port=8040

plugin.path=/usr/share/java
c6ubokkw

c6ubokkw1#

如此处所述:https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_config_options.html#retries
可以在连接器配置中配置2个属性:

max.retries=30
retry.backoff.ms=10000

在这里,它将重试30次,每次重试之间等待10秒(=300秒=5分钟)

qvtsj1bj

qvtsj1bj2#

jdbc sink connect与源连接器共享相同的参数。以下内容用于连接尝试

connection.attempts
Maximum number of attempts to retrieve a valid JDBC connection.

Type: int
Default: 3
Importance: low

connection.backoff.ms
Backoff time in milliseconds between connection attempts.

Type: long
Default: 10000
Importance: low

裁判:https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#database
https://github.com/confluentinc/kafka-connect-jdbc/blob/v5.3.0-rc4/src/main/java/io/confluent/connect/jdbc/util/cachedconnectionprovider.java#l43

相关问题