我们有oracle源代码,需要从那里获取数据,面临avro和json格式的错误。
连接器文件
{
"name": "LITERAL_VALUES",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"connection.user": "<user>",
"connection.password": "<Password>",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@<server>:<Port>/<Schema>",
"mode": "bulk",
"topic.prefix": "LITERAL_VALUES",
"batch.max.rows":1000,
"numeric.mapping":"best_fit",
"query":"SELECT abc from xyz"
}
}
使用avro格式时出错
DataException: Cannot deserialize type int64 as type float64
使用json格式时出错
WARN task [0_0] Skipping record due to deserialization error. topic=[LITERAL_VALUES_JSON] partition=[0] offset=[12823] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: LITERAL_VALUES_JSON
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xf01ae03 (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
尝试使用“table.whitelist”属性创建连接器文件并使用ksql
Unable to verify the AVRO schema is compatible with KSQL. Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:50)
选中的rest架构
{
"subject": "RAW-LITERAL_VALUES-value",
"version": 1,
"id": 16,
"schema": "{\"type\":\"record\",\"name\":\"LITERAL_VALUES\",\"fields\":[{\"name\":\"LITERAL_ID\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":127,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"127\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LITERAL_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LITERAL_VALUE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_INSTANCE_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EFF_STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"EFF_END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"CRTD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"CRTD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"LST_UPD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LST_UPD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}],\"connect.name\":\"LITERAL_VALUES\"}"
}
非常感谢您的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!