我正在使用mysql5.7和jdbcsourceconnector 5.3.x。我使用的是自定义查询模式,但似乎没有使用正确的语法来选择查询中的增量列(在我的例子中是id)。这似乎是我在下面发布的npe的原因。我以此作为参考:https://github.com/confluentinc/kafka-connect-jdbc/issues/560
下面是配置(截断),我尝试在其中选择增量列id和其他配置值。
query= select events.eventId, events.* from (select ES.id as eventId, ES.action, U.name FROM eventsource as ES INNER JOIN user U ON ES.user_id = U.id where U.email='xxx') events .
mode=incrementing
incrementing.column.name=events.eventId
quote.sql.identifiers=never
有人能提供适当的语法来克服这个错误吗。
java.lang.NullPointerException
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:237)
at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:195)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:192)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-01-28 14:14:37,251] ERROR WorkerSourceTask{id=connect-event-source1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
暂无答案!
目前还没有任何答案,快来回答吧!