kafka connect sap连接器在“递增”模式下不工作

5rgfhyps  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(302)

我一直在尝试将数据从saphana加载到hdfs,并使用“aedat”作为递增列。日期格式“aedat”为“yyyymmdd”,但kafka连接器显示以下错误:

[2020-10-05 16:37:13,587] ERROR WorkerSourceTask{id=incremental-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
java.lang.IllegalArgumentException: The Incrementing column is not found in the table or is not of correct type
        at com.sap.kafka.connect.source.querier.IncrColTableQuerier.getIncrementingColumn(IncrColTableQuerier.scala:106)
        at com.sap.kafka.connect.source.querier.IncrColTableQuerier.<init>(IncrColTableQuerier.scala:21)
        at com.sap.kafka.connect.source.GenericSourceTask.$anonfun$start$8(GenericSourceTask.scala:121)
        at com.sap.kafka.connect.source.GenericSourceTask.$anonfun$start$8$adapted(GenericSourceTask.scala:99)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.sap.kafka.connect.source.GenericSourceTask.start(GenericSourceTask.scala:99)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:215)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

样本行(最后一列为aedat):

('300', '4500000000', '00010', '450000000000010', '', '', '20200102')

hana-source.properties文件

name=incremental-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=saptohdfs
connection.url=jdbc:sap://xx.xx.xx.xx:30041/
connection.user=user
connection.password=pass
saptohdfs.table.name="SAPHANADB"."EKPO"
mode=incrementing
saptohdfs.incrementing.column.name="AEDAT"

hdfs-sink.properties属性

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
hdfs.url=hdfs://ip-10-1-1-131.ap-south-1.compute.internal:8020/warehouse/tablespace/external/hive
tasks.max=1
topics=saptohdfs
flush.size=3
hive.integration=true
hive.metastore.uris=thrift://ip-10-1-1-131.ap-south-1.compute.internal:9083
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.avro.AvroFormat

# value.converter=io.confluent.connect.avro.AvroConverter

我做错什么了?我在这里也提出了一个问题:
https://github.com/sap/kafka-connect-sap/issues/41

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题