我尝试在批量模式下使用kafka connect jdbc源连接器,并具有以下属性。
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
我在提交偏移量时遇到以下错误,更改各种参数似乎没有什么效果。
[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
3条答案
按热度按时间v2g6jxz61#
此错误表示缓冲了大量消息,在达到超时之前无法刷新。
为了解决这个问题,你可以
要么增加
offset.flush.timeout.ms
kafka connect worker配置中的配置参数或者可以通过减少
producer.buffer.memory
在你的Kafka连接工人配置。当您有相当大的消息时,这是最好的选择。41zrol4v2#
什么时候
security.protocol=SSL
如果已启用,请确保connect worker和connect producer有单独的ssl参数。为两者提供ssl设置块引用
看到了吗https://docs.confluent.io/5.2.3/connect/security.html#separate-负责人
nfs0ujit3#
如果您正试图连接到合流云,则此错误可能是因为worker属性中缺少配置,请确保添加了生产者和使用者配置。