varchar数据类型可以是confluent中的时间戳吗?

zzoitvuj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(403)

我正在使用confluent实现实时etl。我的数据源是oracle,每个表都有一个名为ts的列,它的数据类型是varchar,但该列中的数据是yyyy-mm--dd hh24:mi:ss格式。我可以在confluent kafka连接器中将此列用作时间戳吗?如何配置xx.properties文件?

mode=timestamp
query= select to_date(a.ts,'yyyy-mm-dd hh24:mi:ss') tsinc,a.* from TEST_CORP a
poll.interval.ms=1000 
timestamp.column.name=tsinc
xghobddn

xghobddn1#

connector.class=io.confluent.connect.jdbc.jdbcsourceconnector query=selectfrom nfsn.bd\u corp mode=timestamp poll.interval.ms=3000 timestamp.column.name=ts topic.prefix=t\uvalidate.non.null=false
然后我得到这个错误:
[2018-12-25 14:39:59756]过滤表后的信息为:(io.confluent.connect.jdbc.source.tablemoni)torthread:175)[2018-12-25 14:40:01383]调试检查TimestampIncrementTableQuerier{table=null,query='select
from nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn=',timestampcolumns=[ts]}(io.confluent.connect.jdbc.source.jdbcs)ourcetask:291)[2018-12-25 14:40:01386]调试timestampincrementablequeryer{table=null,query='selectfrom nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn='',timestampcolumns=[ts]}准备的sql查询:selectfrom nfsn.bd\u corp where“ts”>?和“ts”<?按“ts”asc排序(io.confluent.connect.jdbc.source.timestampincrementingtablequerier:161)[2018-12-25 14:40:01386]调试执行查询从dual中选择current\u timestamp,从数据库(io.confluent.connect.jdbc.dialect.oracledataba)中获取当前时间sedialect:462) [2018-12-25 14:40:01,388]调试执行准备好的语句,时间戳值=1970-01-01 00:00:00.000结束时间=2018-12-25 06:40:43.828(io.confluent.connect.jdbc.source.timestampincrementin)gcriteria:162)[2018-12-25 14:40:01389]对表TimestampIncrementTableQuerier{table=null,query='select*from nfsn.bd\u corp'运行查询失败,topicprefix='t',incrementingcolumn='',timestampcolumns=[ts]}:{}(io.confluent.connect.jdbc.source.jdbcsourcetask:314)java.sql.sqldataexception:ora-01843:不是有效月份

at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:447)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:951)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:513)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:208)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:886)
    at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1175)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1296)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3613)
    at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3657)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1495)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:168)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:88)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:60)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:292)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) [2018-12-25 14:40:01,390] DEBUG Resetting querier

timestampIncrementTableQuerier{table=null,query='select*from nfsn.bd\u corp',topicprefix='t\u',incrementingcolumn='',timestampcolumns=[ts]}(io.confluent.connect.jdbc.source.jdbcsourcetask:332)^c[2018-12-25 14:40:03826]Kafka连接停止信息(org.apache.kafka.connect.runtime)。connect:65) [2018-12-25 14:40:03,827]停止rest服务器的信息(org.apache.kafka.connect.runtime.rest.r)estserver:223)

相关问题