尝试从嵌套架构中分解给定列。我试图通过在Dataframe上向左折叠来实现这一点。
这里我只处理了两种情况
如果列类型是struct,那么我将尝试通过select子句获取
如果列类型是array,那么我将尝试使用withcolumn then select子句分解数据
以下是我的模式:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("RootData", StructType(Seq(
StructField("Rates",ArrayType(StructType(Array(
StructField("Code",StringType,true),
StructField("Rate",StringType,true),
StructField("Type",StringType,true),
StructField("TargetValue",StringType,true)))), true),
StructField("RecordCount",LongType,true))),true),
StructField("CreationDate",StringType,true),
StructField("SysID",StringType,true),
StructField("ImportID",StringType,true)))
|-- RootData: struct (nullable = true)
| |-- Rates: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Code: string (nullable = true)
| | | |-- Rate: string (nullable = true)
| | | |-- Type: string (nullable = true)
| | | |-- TargetValue: string (nullable = true)
| |-- RecordCount: long (nullable = true)
|-- CreationDate: string (nullable = true)
|-- SysID: string (nullable = true)
|-- ImportID: string (nullable = true)
下面是代码片段:
// Here sourceDF has nested schema dataframe
// List of nested columns
def execute(sourceDf: DataFrame, exp_Cols : Array[String]) = {
var list = Array[String]()
val df = exp_Cols.foldLeft(sourceDf){(df, colName) =>
if ( df.columns.contains(colName) ) {
val typeName = df.schema( colName ).dataType.typeName
println("typeName " + typeName)
if ( typeName == "struct" || typeName == "array") list = list :+ colName
if (typeName == "struct") df.selectExpr("*", colName + ".*")
else if (typeName == "array") df.withColumn(colName, explode(col(colName))).selectExpr("*", colName + ".*")
else df
}
df
}
println(list.toList)
df.drop(list:_*)
}
但当我试图用下面的语句它的工作预期。和我写的一样。
nestedDf.selectExpr("*", "RootData.*").withColumn("Rates",explode($"Rates")).selectExpr("*","Rates.*").drop("RootData", "Rates")
我在上述方法中是否犯了错误,或者我们是否可以用更好的方法来实现这一点。
我使用的是spark2.30版本和scala 2.11版本
编辑:
请查看以下示例数据:
val jsonStr = """{"RootData":{"Rates":[{"Code":"USD","Rate":"2.007500000","Type":"Common","TargetValue":"BYR"},
{"Code":"USD","Rate":"357.300000000","Type":"Common","TargetValue":"MRO"},
{"Code":"USD","Rate":"21005.000000000","Type":"Common","TargetValue":"STD"},
{"Code":"USD","Rate":"248520.960000000","Type":"Common","TargetValue":"VEF"},
{"Code":"USD","Rate":"77.850000000","Type":"Common","TargetValue":"AFN"},
{"Code":"USD","Rate":"475.150000000","Type":"Common","TargetValue":"AMD"},
{"Code":"USD","Rate":"250.000000000","Type":"Common","TargetValue":"YER"},
{"Code":"USD","Rate":"15.063500000","Type":"Common","TargetValue":"ZAR"},
{"Code":"USD","Rate":"13.291500000","Type":"Common","TargetValue":"ZMW"},
{"Code":"USD","Rate":"1.000000000","Type":"Common","TargetValue":"USD"}
],"RecordCount":10}, "CreationDate":"2020-01-01","SysID":"987654321","ImportID":"123456789"}"""
val nestedDf = spark.read.json(Seq(jsonStr).toDS)
val exp_cols = Array("RootData", "Rates")
execute(nestedDf, exp_cols)
我使用的临时解决方案如下:
def forStructTypeCol(df : DataFrame, colName: String) = df.selectExpr("*", colName +".*")
def forArrayTypeCol(df : DataFrame, colName: String) = df.withColumn(colName, explode(col(colName))).selectExpr("*", colName +".*")
var t_nestedDf = nestedDf
exp_cols.foreach(colName=> { t_nestedDf = if ( t_nestedDf.columns.contains(colName) ) { val typeName = t_nestedDf.schema( colName ).dataType.typeName ; if ( typeName == "struct") forStructTypeCol(t_nestedDf, colName) else if (typeName == "array") forArrayTypeCol(t_nestedDf, colName) else t_nestedDf } else t_nestedDf })
val finaldf = t_nestedDf.drop(exp_cols:_*)
1条答案
按热度按时间ovfsdjhp1#
我认为您的代码是错误的,因为您总是返回df,而不是包含附加列的df(可能您缺少else子句?):