我尝试在sqlite中设置一个包含两个表的数据库。我的表有一个timestamp列。我试图实现时间戳模式来捕获数据库中的增量更改。kafka connect失败,出现以下错误:
ERROR Failed to get current time from DB using Sqlite and query 'SELECT
CURRENT_TIMESTAMP'
(io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect:471)
java.sql.SQLException: Error parsing time stamp
Caused by: java.text.ParseException: Unparseable date: "2019-02-05 02:05:29"
does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q
\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
非常感谢你的帮助
配置:
name=test-query-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:employee.db
query=SELECT users.id, users.name, transactions.timestamp, transactions.payment_type FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=test-joined
ddl地址:
CREATE TABLE transactions(id integer primary key not null,
payment_type text not null,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
user_id int not null,
constraint fk foreign key(user_id) references users(id)
);
CREATE TABLE users (id integer primary key not null,name text not null);
2条答案
按热度按时间8oomwypt1#
如果“timestamp”列的值的格式是“unix timestamp”,那么kafka connect jdbc连接器可以轻松地检测时间戳中的更改。
这些值可以插入为:
然后,kafka jdbc源连接器将检测到与时间戳相关的更改,并且可以按如下方式使用这些更改:
mkshixfv2#
我已经复制了这个,它已经被记录为jdbc源代码连接器的一个问题。您可以在此处监视它:https://github.com/confluentinc/kafka-connect-jdbc/issues/219