我正在尝试从已经存在的架构创建structtype架构。我有一个列表,其中包含新模式所需的字段。最困难的部分是模式是一个嵌套的json数据,包含复杂的字段,包括arraytype(structtype)。这是模式的代码,
val schema1: Seq[StructField] = Seq(
StructField("playerId", StringType, true),
StructField("playerName", StringType, true),
StructField("playerCountry", StringType, true),
StructField("playerBloodType", StringType, true)
)
val schema2: Seq[StructField] =
Seq(
StructField("PlayerHistory", ArrayType(
StructType(
Seq(
StructField("Rating", StringType, true),
StructField("Height", StringType, true),
StructField("Weight", StringType, true),
StructField("CoachDetails",
StructType(
Seq(
StructField("CoachName", StringType, true),
StructField("Address",
StructType(
Seq(
StructField("AddressLine1", StringType, true),
StructField("AddressLine2", StringType, true),
StructField("CoachCity", StringType, true))), true),
StructField("Suffix", StringType, true))), true),
StructField("GoalHistory", ArrayType(
StructType(
Seq(
StructField("MatchDate", StringType, true),
StructField("NumberofGoals", StringType, true),
StructField("SubstitutionIndicator", StringType, true))), true), true),
StructField("receive_date", DateType, true))
), true
)))
val requiredFields = List("playerId", "playerName", "Rating", "CoachName", "CoachCity", "MatchDate", "NumberofGoals")
val schema: StructType = StructType(schema1 ++ schema2)
变量schema是当前schema,requiredfields保存新schema所需的字段。我们还需要新模式中的父块。输出架构应该类似于:
val outputSchema =
Seq(
StructField("playerId", StringType, true),
StructField("playerName", StringType, true),
StructField("PlayerHistory",
ArrayType(StructType(
StructField("Rating", StringType, true),
StructField("CoachDetails",
StructType(
StructField("CoachName", StringType, true),
StructField("Address", StructType(
StructField("CoachCity", StringType, true)), true),
StructField("GoalHistory", ArrayType(
StructType(
StructField("MatchDate", StringType, true),
StructField("NumberofGoals", StringType, true)), true), true)))
我试着用下面的代码以递归的方式处理这个问题。
schema.fields.map(f => filterSchema(f, requiredFields)).filter(_.name != "")
def filterSchema(field: StructField, requiredColumns: Seq[String]): StructField = {
field match{
case StructField(_, inner : StructType, _ ,_) => StructField(field.name,StructType(inner.fields.map(f => filterSchema(f, requiredColumns))))
case StructField(_, ArrayType(structType: StructType, _),_,_) =>
if(requiredColumns.contains(field.name))
StructField(field.name, ArrayType(StructType(structType.fields.map(f => filterSchema(f,requiredColumns))),true), true)
else
StructField("",StringType,true)
case StructField(_, _, _, _) => if(requiredColumns.contains(field.name)) field else StructField("",StringType,true)
}
}
但是,我很难过滤出内部结构字段。
感觉可以对递归函数的基本条件进行一些修改。如有任何帮助,我们将不胜感激。提前谢谢。
1条答案
按热度按时间50few1ms1#
我是这样做的,
这将遍历structfields,每次遇到新的structtype时,都会递归调用函数以获取新的structtype。