我是新来Kafka的
我想使用oraclejdbcsink连接器将kafka主题中的数据写入oracle
我现在不能使用avro/schema注册表,所以它必须是json,其中包含数据集中的模式信息
“helloworld”数据集具有简单的数据类型,如int、string,通过connector auto create成功地写入到oracle表中
现在我被卡住了,因为我想在有效负载中使用“date”数据类型,但我不知道如何在模式信息中指定它。
下面是错误消息(connect distributed.log)
ERROR WorkerSinkTask{id=oracle-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: org.apache.kafka.connect.data.Date
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:528)
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:524)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:371)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
以下是我的json的相关部分:
{"records":
[{"value":
{"schema":
{ "type": "struct", "fields":
[{"type": "org.apache.kafka.connect.data.Date", "optional": false, "field": "BEZDAT"}
...]
, "optional": false, "name": "foobar"}
, "payload": {"BEZDAT": "2020-07-15"
...}}]}'
我也试过约会,´日期´, Date
作为类型
根据文件,可以使用“日期”。但如何具体说明呢?我错过什么了吗?
暂无答案!
目前还没有任何答案,快来回答吧!