我正在尝试使用kafka连接接收器将文件从kafka写入hdfs。
我的属性看起来像:
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false
当我尝试运行连接器时,出现了以下异常:
org.apache.kafka.connect.errors.dataexception:jsonconverter with schemas.enable需要“schema”和“payload”字段,不能包含其他字段。如果试图反序列化纯json数据,请在转换器配置中设置schemas.enable=false。
我使用的是汇合版本4.0.0。
有什么建议吗?
1条答案
按热度按时间khbbv19g1#
我对这个问题的理解是,如果您设置schemas.enable=true,您就告诉kafka您想将schema包含到kafka必须传输的消息中。在本例中,kafka消息没有纯json格式。相反,它首先描述模式,然后附加与该模式相对应的有效负载(即实际数据)(请参阅avro格式)。这就导致了冲突:一方面您为数据指定了jsonconverter,另一方面您要求kafka将模式包含到消息中。要解决此问题,可以将avroconverter与schemas.enable=true一起使用,也可以将jsonconverter与schemas.enable=false一起使用。