我在kafka jdbc源连接器属性文件中有以下sql查询:
query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'
如果我在sqldeveloper中运行相同的查询,它可以正常工作并获取结果。但如果我在“jdbc\u workflow\u connect.properties”中使用相同的查询,则会出现以下错误:
(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:223)
[2018-09-19 12:32:15,130] INFO WorkerSourceTask{id=Workflow-DB-source-0}
Source task finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-09-19 12:32:15,328] ERROR Failed to run query for table
TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM
JENNY.WORKFLOW where ID = '565231'', topicPrefix='workflow_data1',
timestampColumn='null', incrementingColumn='ID'}: {}
(io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:776)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:897)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1034)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3867)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1502)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
以下是我的jdbc源连接器属性文件内容:
name=Workflow-DB-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password =******
connection.url = jdbc:oracle:thin:@1.1.1.1:****/****
connection.user =*****
table.types=TABLE
query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'
mode=incrementing
incrementing.column.name=ID
topic.prefix=workflow_data1
timestamp.delay.interval.ms=60000
transforms:createKey
transforms.createKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields:ID
我正在使用ojdbc7.jar
观察:
如果我删除了“where”子句,查询工作正常(如下所示):
SELECT * FROM JENNY.WORKFLOW
请让我知道,如果我做了什么错误或任何修改所需的设置在jdbc源连接器。
提前谢谢。
1条答案
按热度按时间3htmauhk1#
从jdbc connect配置选项的文档中,您可以阅读
如果指定,则执行查询以选择新的或更新的行。如果要联接表、选择表中列的子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据–整个表复制将被禁用。对于增量更新,仍然可以使用不同的查询模式,但是为了正确地构造增量查询,必须可以将where子句附加到此查询(即,不能使用where子句)。
如果你真的只想考虑表中给定
ID
必须按如下方式 Package 查询但请确保您查看了配置选项的文档,并且知道
query
参数。