ksql失败,出现charconversionexception:读取流时utf-32字符无效

2hh7jdfx  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(535)

我正在用Kafka连接把一些数据传输到Kafka。我可以使用 print 'kdc-01-orders' from beginning; . 数据看起来是正确的。为了确保,我成功地将其解析为json。
所以我用这个主题创建了一个表,如下所示:

create table orders
(order_num varchar, cust_id integer, order_date integer)
with
(kafka_topic='kdc-01-orders', value_format='json', key='order_num');

已成功创建表。但是,当我这样查询时:

select * from orders limit 100;

我在ksql日志中看到很多错误。它们看起来像这样:

ksql-server_1      | [2019-07-01 21:21:43,803] WARN task [0_0] Skipping record due to deserialization error. topic=[kdc-01-orders] partition=[0] offset=[999] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
ksql-server_1      | org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: kdc-01-orders
ksql-server_1      | Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
ksql-server_1      |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
ksql-server_1      | Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1      | Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1      |    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
ksql-server_1      |    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2331)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:646)
ksql-server_1      |    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
ksql-server_1      |    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
ksql-server_1      |    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
ksql-server_1      |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

有问题的人物, 0x1a9ef2e ,似乎不是有效字符,如错误所示。这些数据是从一个使用拉丁1编码的数据库中提取出来的,但是我不知道如何告诉ksql。这个错误重复了很多次,每次都有不同的字符。

nlejzf6q

nlejzf6q1#

print命令还将输出avro记录,使其看起来像json。但是,print还会输出主题处的值(和键)的格式。检查此项以查看数据实际上是json还是avro:

Key format: KAFKA_STRING
    Value format: AVRO
    rowtime: 12/21/18 23:58:42 PM PSD, key: k0, value: {"v0":"hello": "v1": 10}
    ^CTopic printing ceased

本文将更深入地讨论转换器和序列化:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

相关问题