我已经使用pyspark构建了spark结构化流处理,它从kafka主题读取avro消息,进行一些转换,并将数据作为avro加载到目标主题中。
我用的是abris包(https://github.com/absaoss/abris)从confluent序列化/反序列化avro,与schema registry集成。
架构包含整数列,如下所示:
{
"name": "total_images",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "total_videos",
"type": [
"null",
"int"
],
"default": null
},
该进程引发以下错误: Cannot convert Catalyst type IntegerType to Avro type ["null","int"].
我尝试将列转换为可为空,但错误仍然存在。
如果有人有什么建议,我将不胜感激
1条答案
按热度按时间6ie5vjzr1#
我在这件事上花了好几个小时
实际上,它与abris依赖无关(行为与本机spark avro api相同)
可能有几个根本原因,但在我的例子中…使用spark 3.0.1,scala with dataset:它与编码器有关,并且在处理数据的case类中有错误的类型。
简而言之,用“type”:[“null”,“int”]定义的avro字段不能Map到scala int,它需要选项[int]
使用以下代码:
}
如果case类定义如下,则失败:
无法将catalyst类型longtype转换为avro类型[“null”,“long”]。org.apache.spark.sql.avro.compatibleschemaException:无法将catalyst类型longtype转换为avro类型[“null”,“long”]。在org.apache.spark.sql.avro.avroserializer.newconverter上。scala:219)在org.apache.spark.sql.avro.avroserializer.$anonfun$newstructconverter$1(avroserializer。scala:239)
它可以正常工作:
顺便说一句,在spark编码器中支持specificrecord会更好(你可以使用kryo,但它效率很低),因为为了在我的avro数据中有效地使用类型化数据集…我需要创建额外的case类(它与我的specificrecords重复)。