无法将catalyst类型integertype转换为avro类型[“null”,“int”]

nwnhqdif  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(507)

我已经使用pyspark构建了spark结构化流处理,它从kafka主题读取avro消息,进行一些转换,并将数据作为avro加载到目标主题中。
我用的是abris包(https://github.com/absaoss/abris)从confluent序列化/反序列化avro,与schema registry集成。
架构包含整数列,如下所示:

{
  "name": "total_images",
  "type": [
    "null",
    "int"
  ],
  "default": null
},
{
  "name": "total_videos",
  "type": [
    "null",
    "int"
  ],
  "default": null
},

该进程引发以下错误: Cannot convert Catalyst type IntegerType to Avro type ["null","int"]. 我尝试将列转换为可为空,但错误仍然存在。
如果有人有什么建议,我将不胜感激

6ie5vjzr

6ie5vjzr1#

我在这件事上花了好几个小时
实际上,它与abris依赖无关(行为与本机spark avro api相同)
可能有几个根本原因,但在我的例子中…使用spark 3.0.1,scala with dataset:它与编码器有关,并且在处理数据的case类中有错误的类型。
简而言之,用“type”:[“null”,“int”]定义的avro字段不能Map到scala int,它需要选项[int]
使用以下代码:

test("Avro Nullable field") {
val schema: String =
  """
    |{
    | "namespace": "com.mberchon.monitor.dto.avro",
    | "type": "record",
    | "name": "TestAvro",
    | "fields": [
    |  {"name": "strVal", "type": ["null", "string"]},
    |  {"name": "longVal",  "type": ["null", "long"]}
    |  ]
    |}
  """.stripMargin
val topicName = "TestNullableAvro"
val testInstance = TestAvro("foo",Some(Random.nextInt()))

import sparkSession.implicits._

val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)

dsWrite
  .select(to_avro(allColumns,schema) as 'value)
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("topic", topicName)
  .save()

val dsRead:Dataset[TestAvro] = sparkSession.read
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .load()
  .select(from_avro(col("value"), schema) as 'Metric)
  .select("Metric.*")
  .as[TestAvro]

assert(dsRead.collect().contains(testInstance))

}
如果case类定义如下,则失败:

case class TestAvro(strVal:String,longVal:Long)

无法将catalyst类型longtype转换为avro类型[“null”,“long”]。org.apache.spark.sql.avro.compatibleschemaException:无法将catalyst类型longtype转换为avro类型[“null”,“long”]。在org.apache.spark.sql.avro.avroserializer.newconverter上。scala:219)在org.apache.spark.sql.avro.avroserializer.$anonfun$newstructconverter$1(avroserializer。scala:239)
它可以正常工作:

case class TestAvro(strVal:String,longVal:Option[Long])

顺便说一句,在spark编码器中支持specificrecord会更好(你可以使用kryo,但它效率很低),因为为了在我的avro数据中有效地使用类型化数据集…我需要创建额外的case类(它与我的specificrecords重复)。

相关问题