我正在尝试用kafka hdfs sink编写json。
我有以下属性(connect standalone.properties):
key.converter.schemas.enable = false
value.converter.schemas.enable = false
schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
关于我的财产:
format.class=io.confluent.connect.hdfs.json.JsonFormat
我得到了以下例外:
org.apache.kafka.connect.errors.dataexception:由于序列化错误,将字节[]转换为kafka connect失败
... 原因:org.apache.kafka.commom.errors.serlizationexception:com.fasterxml.jackson.core.jsonparseexception:无法识别的标记“test”:在[source:(byte[])“test”行:1列:11处应为“null”、“true”、“false”或nan
我的json是有效的。
请问我怎么解决?
- 我还尝试使用示例json,例如:
{"key":"value"}
同样的错误。
谢谢。
1条答案
按热度按时间ubof19bj1#
根据这个错误,并不是主题中的所有消息都是json对象。最新消息可能是有效的,或者kafka值可能是有效的(但不是键),但是错误显示它试图读取一个普通字符串,
(byte[])"test"
,无效如果您只想将文本数据转换成hdfs,那么可以使用字符串格式,但是这不会有hive集成
如果您确实想使用这种格式的配置单元,那么您需要自己定义json serde