kafka connect jdbc connector query+递增模式在初始轮询时阻塞了大数据集

ghg1uchk  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(344)

我使用jdbc连接器将数据从mysql移动到kafka中。我感兴趣的数据来自3个表,因此我用 mode:incrementing 以及 query :

{
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}

检查正在运行的连接器状态时:

curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}

但一个小时后,我仍然没有看到创建的主题。我已经检查了mysql中运行的查询 show full processlist; 我看到两个这样的问题:

select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC

因此基本上查询与我在中提供的查询相同 query 在connector configuration plus中 WHERE s.id > -1 ORDER BY s.id ASC ,因为这个连接中的查询产生了2100万行的结果集,所以mysql很长时间都在发送数据。当我再次与 show full processlist; 我现在看到4个这样的查询,然后是8个,然后是16个,依此类推。
问题是:
为什么kafka connect在添加以下内容时要同时获取所有行: s.id > -1 ORDER BY s.id ASC .
是否可以将连接器配置为不执行此操作,而是获取较小的量?
"batch.max.rows": "100" 只控制初始轮询后的批量大小??
更新:
这个问题有一个开放的主题。我想这个问题可以结束了。

vhmi4jdf

vhmi4jdf1#

jdbc源连接器
incrementing mode 通过了 query ,使用以下where子句执行该查询: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC . (如果使用增量模式和查询,则无法传递 where 条款)。
第一次投票 lastIncrementedValue 是-1,所以它尝试查询所有记录。在提取每个记录后,lastincrementedvalue增加,所以下一次查询将只轮询新数据。 batch.max.rows 指有多少条记录 SourceTask::poll(...) 将返回Kafka连接框架。它是一次发送给Kafka的最大批量。
我认为,当您从单个表中获取数据时,它工作得更快,因为查询执行得更快(不那么复杂)。如果您使用其他sql工具执行这些查询,它将执行类似的操作。

7gcisfzg

7gcisfzg2#

5.5中添加了query.suffix。我用它添加了一个limit语句,效果很好,它只是将limit附加到查询的末尾。
见问题

相关问题