我正在使用confluent实现实时etl。我的数据源是oracle,每个表都有一个名为ts的列,它的数据类型是varchar,但该列中的数据是yyyy-mm--dd hh24:mi:ss格式。我可以在confluent kafka连接器中将此列用作时间戳吗?如何配置xx.properties文件?
mode=timestamp
query= select to_date(a.ts,'yyyy-mm-dd hh24:mi:ss') tsinc,a.* from TEST_CORP a
poll.interval.ms=1000
timestamp.column.name=tsinc
1条答案
按热度按时间xghobddn1#
connector.class=io.confluent.connect.jdbc.jdbcsourceconnector query=selectfrom nfsn.bd\u corp mode=timestamp poll.interval.ms=3000 timestamp.column.name=ts topic.prefix=t\uvalidate.non.null=false
然后我得到这个错误:
[2018-12-25 14:39:59756]过滤表后的信息为:(io.confluent.connect.jdbc.source.tablemoni)torthread:175)[2018-12-25 14:40:01383]调试检查TimestampIncrementTableQuerier{table=null,query='selectfrom nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn=',timestampcolumns=[ts]}(io.confluent.connect.jdbc.source.jdbcs)ourcetask:291)[2018-12-25 14:40:01386]调试timestampincrementablequeryer{table=null,query='selectfrom nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn='',timestampcolumns=[ts]}准备的sql查询:selectfrom nfsn.bd\u corp where“ts”>?和“ts”<?按“ts”asc排序(io.confluent.connect.jdbc.source.timestampincrementingtablequerier:161)[2018-12-25 14:40:01386]调试执行查询从dual中选择current\u timestamp,从数据库(io.confluent.connect.jdbc.dialect.oracledataba)中获取当前时间sedialect:462) [2018-12-25 14:40:01,388]调试执行准备好的语句,时间戳值=1970-01-01 00:00:00.000结束时间=2018-12-25 06:40:43.828(io.confluent.connect.jdbc.source.timestampincrementin)gcriteria:162)[2018-12-25 14:40:01389]对表TimestampIncrementTableQuerier{table=null,query='select*from nfsn.bd\u corp'运行查询失败,topicprefix='t',incrementingcolumn='',timestampcolumns=[ts]}:{}(io.confluent.connect.jdbc.source.jdbcsourcetask:314)java.sql.sqldataexception:ora-01843:不是有效月份
timestampIncrementTableQuerier{table=null,query='select*from nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn='',timestampcolumns=[ts]}(io.confluent.connect.jdbc.source.jdbcsourcetask:332)^c[2018-12-25 14:40:03826]Kafka连接停止信息(org.apache.kafka.connect.runtime)。connect:65) [2018-12-25 14:40:03,827]停止rest服务器的信息(org.apache.kafka.connect.runtime.rest.r)estserver:223)