我正在用Kafka连接把一些数据传输到Kafka。我可以使用 print 'kdc-01-orders' from beginning;
. 数据看起来是正确的。为了确保,我成功地将其解析为json。
所以我用这个主题创建了一个表,如下所示:
create table orders
(order_num varchar, cust_id integer, order_date integer)
with
(kafka_topic='kdc-01-orders', value_format='json', key='order_num');
已成功创建表。但是,当我这样查询时:
select * from orders limit 100;
我在ksql日志中看到很多错误。它们看起来像这样:
ksql-server_1 | [2019-07-01 21:21:43,803] WARN task [0_0] Skipping record due to deserialization error. topic=[kdc-01-orders] partition=[0] offset=[999] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
ksql-server_1 | org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: kdc-01-orders
ksql-server_1 | Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
ksql-server_1 | at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1 | at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
ksql-server_1 | Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1 | Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1 | at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
ksql-server_1 | at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
ksql-server_1 | at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
ksql-server_1 | at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2331)
ksql-server_1 | at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:646)
ksql-server_1 | at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
ksql-server_1 | at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
ksql-server_1 | at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
ksql-server_1 | at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1 | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1 | at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1 | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
有问题的人物, 0x1a9ef2e
,似乎不是有效字符,如错误所示。这些数据是从一个使用拉丁1编码的数据库中提取出来的,但是我不知道如何告诉ksql。这个错误重复了很多次,每次都有不同的字符。
1条答案
按热度按时间nlejzf6q1#
print命令还将输出avro记录,使其看起来像json。但是,print还会输出主题处的值(和键)的格式。检查此项以查看数据实际上是json还是avro:
本文将更深入地讨论转换器和序列化:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/