如何在jdbc接收器连接器模式信息中指定数据类型“date”?

neekobn8  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(291)

我是新来Kafka的
我想使用oraclejdbcsink连接器将kafka主题中的数据写入oracle
我现在不能使用avro/schema注册表,所以它必须是json,其中包含数据集中的模式信息
“helloworld”数据集具有简单的数据类型,如int、string,通过connector auto create成功地写入到oracle表中
现在我被卡住了,因为我想在有效负载中使用“date”数据类型,但我不知道如何在模式信息中指定它。
下面是错误消息(connect distributed.log)

ERROR WorkerSinkTask{id=oracle-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: org.apache.kafka.connect.data.Date
        at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:528)
        at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:524)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:371)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more

以下是我的json的相关部分:

{"records":
    [{"value":  
        {"schema": 
            { "type": "struct", "fields": 
                [{"type": "org.apache.kafka.connect.data.Date", "optional": false, "field": "BEZDAT"}
                ...]
                , "optional": false, "name": "foobar"}
        , "payload": {"BEZDAT": "2020-07-15"
        ...}}]}'

我也试过约会,´日期´, Date 作为类型
根据文件,可以使用“日期”。但如何具体说明呢?我错过什么了吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题