scala—如何基于一系列必需字段来优化spark structtype模式?

dwbf0jvd  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(198)

我正在尝试从已经存在的架构创建structtype架构。我有一个列表,其中包含新模式所需的字段。最困难的部分是模式是一个嵌套的json数据,包含复杂的字段,包括arraytype(structtype)。这是模式的代码,

val schema1: Seq[StructField] = Seq(
      StructField("playerId", StringType, true),
      StructField("playerName", StringType, true),
      StructField("playerCountry", StringType, true),
      StructField("playerBloodType", StringType, true)
    )

    val schema2: Seq[StructField] =
      Seq(
        StructField("PlayerHistory", ArrayType(
          StructType(
            Seq(
              StructField("Rating", StringType, true),
              StructField("Height", StringType, true),
              StructField("Weight", StringType, true),
              StructField("CoachDetails",
                StructType(
                  Seq(
                    StructField("CoachName", StringType, true),
                    StructField("Address",
                      StructType(
                        Seq(
                          StructField("AddressLine1", StringType, true),
                          StructField("AddressLine2", StringType, true),
                          StructField("CoachCity", StringType, true))), true),
                    StructField("Suffix", StringType, true))), true),
              StructField("GoalHistory", ArrayType(
                StructType(
                  Seq(
                    StructField("MatchDate", StringType, true),
                    StructField("NumberofGoals", StringType, true),
                    StructField("SubstitutionIndicator", StringType, true))), true), true),
              StructField("receive_date", DateType, true))
          ), true
        )))

    val requiredFields = List("playerId", "playerName", "Rating", "CoachName", "CoachCity", "MatchDate", "NumberofGoals")

    val schema: StructType = StructType(schema1 ++ schema2)

变量schema是当前schema,requiredfields保存新schema所需的字段。我们还需要新模式中的父块。输出架构应该类似于:

val outputSchema =
      Seq(
        StructField("playerId", StringType, true),
        StructField("playerName", StringType, true),
        StructField("PlayerHistory",
          ArrayType(StructType(
            StructField("Rating", StringType, true),
            StructField("CoachDetails",
              StructType(
                StructField("CoachName", StringType, true),
                StructField("Address", StructType(
                  StructField("CoachCity", StringType, true)), true),
                StructField("GoalHistory", ArrayType(
                  StructType(
                    StructField("MatchDate", StringType, true),
                    StructField("NumberofGoals", StringType, true)), true), true)))

我试着用下面的代码以递归的方式处理这个问题。

schema.fields.map(f => filterSchema(f, requiredFields)).filter(_.name != "")

  def filterSchema(field: StructField, requiredColumns: Seq[String]): StructField = {
    field match{
      case StructField(_, inner : StructType, _ ,_) => StructField(field.name,StructType(inner.fields.map(f => filterSchema(f, requiredColumns))))
      case StructField(_, ArrayType(structType: StructType, _),_,_) => 
        if(requiredColumns.contains(field.name))
          StructField(field.name, ArrayType(StructType(structType.fields.map(f => filterSchema(f,requiredColumns))),true), true)
        else
          StructField("",StringType,true)
      case StructField(_, _, _, _) => if(requiredColumns.contains(field.name)) field else StructField("",StringType,true)
    }
  }

但是,我很难过滤出内部结构字段。
感觉可以对递归函数的基本条件进行一些修改。如有任何帮助,我们将不胜感激。提前谢谢。

50few1ms

50few1ms1#

我是这样做的,

class SchemaRefiner(schema: StructType, requiredColumns: Seq[String]) {
  var FINALSCHEMA: Array[StructField] = Array[StructField]()

  private def refine(schematoRefine: StructType, requiredColumns: Seq[String]): Unit = {
    schematoRefine.foreach(f => {
      if (requiredColumns.contains(f.name)) {
        f match {
          case StructField(_, inner: StructType, _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ f
          case StructField(_, inner: StructType, _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, StructType(new SchemaRefiner(inner, requiredColumns).getRefinedSchema), true)
          case StructField(_, ArrayType(structType: StructType, _), _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ StructField(f.name, ArrayType(StructType(new SchemaRefiner(structType, requiredColumns).getRefinedSchema)), true)
          case StructField(_, _, , _, _) =>
            FINALSCHEMA = FINALSCHEMA :+ f
        }
      }
    })
  }

  def getRefinedSchema: Array[StructField] = {
    refine(schema, requiredColumns)
    this.FINALSCHEMA
  }
}

这将遍历structfields,每次遇到新的structtype时,都会递归调用函数以获取新的structtype。

val fields = new SchemaRefiner(schema,requiredFields)
val newSchema = fields.getRefinedSchema

相关问题