如何对kafka connect源连接器使用timestamp或timestamp+递增模式?

ecr0jaav  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(438)

我有一个数据库(mariadb)关系,其中一列“modified”为“bigint(10)”,表示时间戳,我相信unix时间格式。当我尝试以“timestamp”或“timestamp+incrementing”模式运行kafka源连接器时,不会将任何事件推送到主题中。如果只运行递增,则会将新条目推送到主题。有人能告诉我哪里配置错了连接器吗?或者连接器不能识别unix时间格式的时间戳吗?
我尝试运行具有以下属性的连接器(仅基于时间戳检索):

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name":"only_ts",
        "config": {
            "numeric.mapping": "best_fit",
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://mariadb/moodle",
            "connection.user": "user",
            "connection.password": "",
            "topic.prefix": "only_ts_",
            "mode": "timestamp", 
            "timestamp.column.name":"modified", 
            "table.whitelist":"mdl_forum_posts",
            "poll.intervals.ms": 10000
        }
  }'

每当我创建条目或更新条目时,我都希望看到来自“mdl\u论坛帖子”的条目被推送到Kafka主题“仅mdl\u论坛帖子”中。然而,使用这个连接器,什么都不会发生。如果我只使用“递增”模式,这工作正常,如预期的。但是为了获得数据库更新,我需要添加模式时间戳。
“描述mdl论坛帖子”的输出

+---------------+--------------+------+-----+---------+----------------+

| Field         | Type         | Null | Key | Default | Extra          |

+---------------+--------------+------+-----+---------+----------------+

| id            | bigint(10)   | NO   | PRI | NULL    | auto_increment |

| discussion    | bigint(10)   | NO   | MUL | 0       |                |

| parent        | bigint(10)   | NO   | MUL | 0       |                |

| userid        | bigint(10)   | NO   | MUL | 0       |                |

| created       | bigint(10)   | NO   | MUL | 0       |                |

| modified      | bigint(10)   | NO   |     | 0       |                |

| mailed        | tinyint(2)   | NO   | MUL | 0       |                |

| subject       | varchar(255) | NO   |     |         |                |

| message       | longtext     | NO   |     | NULL    |                |

| messageformat | tinyint(2)   | NO   |     | 0       |                |

| messagetrust  | tinyint(2)   | NO   |     | 0       |                |

| attachment    | varchar(100) | NO   |     |         |                |

| totalscore    | smallint(4)  | NO   |     | 0       |                |

| mailnow       | bigint(10)   | NO   |     | 0       |                |

| deleted       | tinyint(1)   | NO   |     | 0       |                |

+---------------+--------------+------+-----+---------+----------------+

以及“show create table moodle.mdl\u forum\u posts;”的输出:

| mdl_forum_posts | CREATE TABLE mdl_forum_posts (

  id bigint(10) NOT NULL AUTO_INCREMENT,

  discussion bigint(10) NOT NULL DEFAULT '0',

  parent bigint(10) NOT NULL DEFAULT '0',

  userid bigint(10) NOT NULL DEFAULT '0',

  created bigint(10) NOT NULL DEFAULT '0',

  modified bigint(10) NOT NULL DEFAULT '0',

  mailed tinyint(2) NOT NULL DEFAULT '0',

  subject varchar(255) NOT NULL DEFAULT '',

  message longtext NOT NULL,

  messageformat tinyint(2) NOT NULL DEFAULT '0',

  messagetrust tinyint(2) NOT NULL DEFAULT '0',

  attachment varchar(100) NOT NULL DEFAULT '',

  totalscore smallint(4) NOT NULL DEFAULT '0',

  mailnow bigint(10) NOT NULL DEFAULT '0',

  deleted tinyint(1) NOT NULL DEFAULT '0',

  PRIMARY KEY (id),

  KEY mdl_forupost_use_ix (userid),

  KEY mdl_forupost_cre_ix (created),

  KEY mdl_forupost_mai_ix (mailed),

  KEY mdl_forupost_dis_ix (discussion),

  KEY mdl_forupost_par_ix (parent)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |

“修改”列中的一个示例条目是:

select modified from mdl_forum_posts;
1557487199

它是unix时间中的时间戳,如下所示:

select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59

关于相关连接器的相关日志(只有时间戳)似乎显示了一些查询?

kafka-connect_1    | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1    | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
olmpazwi

olmpazwi1#

我也有同样的问题。我唯一的解决方法如下所述:https://github.com/confluentinc/kafka-connect-jdbc/issues/566. 意思是 timestamp unix timestamp(bigint)列的模式可以与自定义查询一起使用。你只需要用你自己的 where clause . 例如,在您的案例中,它可能是这样的:

SELECT id 
FROM mdl_forum_posts
WHERE to_timestamp(modified/1000) > ? AND to_timestamp(modified/1000) < ? ORDER BY modified ASC
--
``` `to_timestamp` 是db方言中的日期转换函数。请注意 `--` 允许自动生成注解 `where clause` .

相关问题