我们能从avro模式自动生成sparksql查询吗?

5m1hhzi4  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(474)

我每天都要处理大量的avro文件。为了从avro中提取数据,我使用sparksql。要实现这一点,首先我需要printschema,然后我需要选择字段来查看数据。我想自动化这个过程。给定任何输入avro,我想写一个脚本,它将自动生成sparksql查询(考虑avsc文件中的结构和数组)。我可以用java或python编写脚本。
--样本输入avro

root
|-- identifier: struct (nullable = true)
|    |-- domain: string (nullable = true)
|    |-- id: string (nullable = true)
|    |-- version: long (nullable = true)
alternativeIdentifiers: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- identifier: struct (nullable = true)
|    |    |    |    |-- domain: string (nullable = true)
|    |    |    |    |-- id: string (nullable = true)

--我期待的结果

SELECT identifier.domain, identifier.id, identifier.version
juud5qan

juud5qan1#

您可以使用类似的方法基于架构生成列列表:

import org.apache.spark.sql.types.{StructField, StructType}
  def getStructFieldName(f: StructField, baseName: String = ""): Seq[String] = {
    val bname = if (baseName.isEmpty) "" else baseName + "."
    f.dataType match {
      case StructType(s) =>
        s.flatMap(x => getStructFieldName(x, bname + f.name))
      case _ => Seq(bname + f.name)
    }
  }

然后它可以用在真实的Dataframe上,比如:

val data = spark.read.json("some_data.json")
val cols = data.schema.flatMap(x => getStructFieldName(x))

结果,我们得到了字符串的序列,我们可以用它来做一个 select :

import org.apache.spark.sql.functions.col
data.select(cols.map(col): _*)

或者我们可以生成一个逗号分隔的列表,我们可以在 spark.sql :

spark.sql(s"select ${cols.mkString(", ")} from table")

相关问题