oracle jdbc源连接器问题

q43xntqr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(327)

我们有oracle源代码,需要从那里获取数据,面临avro和json格式的错误。

连接器文件

{
  "name": "LITERAL_VALUES",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "connection.user": "<user>",
    "connection.password": "<Password>",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@<server>:<Port>/<Schema>",
    "mode": "bulk",
    "topic.prefix": "LITERAL_VALUES",
    "batch.max.rows":1000,
    "numeric.mapping":"best_fit",
    "query":"SELECT abc from xyz"
  }
}

使用avro格式时出错

DataException: Cannot deserialize type int64 as type float64

使用json格式时出错

WARN task [0_0] Skipping record due to deserialization error. topic=[LITERAL_VALUES_JSON] partition=[0] offset=[12823] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: LITERAL_VALUES_JSON
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xf01ae03 (above 0x0010ffff) at char #1, byte #7)
        at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
        at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)

尝试使用“table.whitelist”属性创建连接器文件并使用ksql

Unable to verify the AVRO schema is compatible with KSQL. Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
        at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:50)

选中的rest架构

{
  "subject": "RAW-LITERAL_VALUES-value",
  "version": 1,
  "id": 16,
  "schema": "{\"type\":\"record\",\"name\":\"LITERAL_VALUES\",\"fields\":[{\"name\":\"LITERAL_ID\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":127,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"127\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LITERAL_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LITERAL_VALUE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_INSTANCE_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EFF_STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"EFF_END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"CRTD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"CRTD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"LST_UPD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LST_UPD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}],\"connect.name\":\"LITERAL_VALUES\"}"
}

非常感谢您的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题