在javaspark作业中使用avro模式(avsc)将avro数据写入s3

c0vxltue  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(532)

我想用提供的avro模式而不是spark自动生成的模式来编写avro格式的Dataframe。如何告诉spark在编写时使用自定义模式?

{
  "type" : "record",
  "name" : "name1",
  "namespace" : "com.data"
  "fields" : [
  {
    "name" : "id",
    "type" : "string"
  },
  {
    "name" : "count",
    "type" : "int"
  },
  {
    "name" : "val_type",
    "type" : {
      "type" : "enum",
      "name" : "ValType"
      "symbols" : [ "s1", "s2" ]
    }
  }
  ]
}

使用avroschema读取avro。在这一步上一切正常。
数据集d1=spark.read().option(“avroschema”,string.valueof(inavroschema)).format(“com.databricks.spark.avro”).load(“s3\u path”);
在这里,我对上述数据执行一些spark.sql并将其存储到dataframe。
当我试图基于avro模式将avro数据写入s3时
数据类型:

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

finaldf.write().option(“avroschema”,string.valueof(inavroschema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target\u s3\u path”);
我有个错误:

User class threw exception: org.apache.spark.SparkException: Job aborted.
    ......
    Caused by: org.apache.avro.AvroRuntimeException:**Not a union: "string"**
        at org.apache.avro.Schema.getTypes(Schema.java:299)
        at 
org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)

有没有任何方法可以使用avro模式来编写avro数据,或者它的方法是否正确(使用 "option("avroSchema",String.valueOf(inAvroSchema))" )-可能是我做错了什么? "forceSchema" option 对我来说不管用。
提前谢谢。

abithluo

abithluo1#

你可以用 org.apache.spark:spark-avro 打包并尝试设置 avroSchema 上的选项 to_avro 功能。这是医生:https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-来自阿夫罗

rt4zxlrg

rt4zxlrg2#

我四处打探,发现了一些有趣的东西,

case class Name1(id: String, count: Int, val_type: String)

val schema = """{
                   |  "type" : "record",
                   |  "name" : "name1",
                   |  "namespace" : "com.data",
                   |  "fields" : [
                   |  {
                   |    "name" : "id",
                   |    "type" : "string"
                   |  },
                   |  {
                   |    "name" : "count",
                   |    "type" : "int"
                   |  },
                   |  {
                   |    "name" : "val_type",
                   |    "type" : {
                   |      "type" : "enum",
                   |      "name" : "ValType",
                   |      "symbols" : [ "s1", "s2" ]
                   |    }
                   |  }
                   |  ]
                   |}""".stripMargin

val d = Seq(Name1("1",2,"s1"),Name1("1",3,"s2"),Name1("1",4,"s2"),Name1("11",2,"s1")).toDF()

d.write.mode(SaveMode.Overwrite).format("avro").option("avroSchema",schema).save("data/tes2/")

当我用spark2.4.x执行代码时,上面的代码失败了,但是当我用新的spark3.0.0运行相同的代码时,代码成功了,数据也成功地写入了。

val df = spark.read.format("avro").load("data/tes2/")
df.printSchema()
df.show(10)

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

+---+-----+--------+
| id|count|val_type|
+---+-----+--------+
| 11|    2|      s1|
|  1|    4|      s2|
|  1|    3|      s2|
|  1|    2|      s1|
+---+-----+--------+

我想最好的办法是升级spark版本或更改avro模式定义。

相关问题