zip函数

mjqavswn  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(434)

我想在sparksql表中转置多列
我发现这个解决方案只适用于两列,我想知道如何使用具有三列的zip函数 varA, varB and varC. ```
import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show

这是我的Dataframe架构:

root |-- owningcustomerid: string (nullable = true) |-- event_stoptime: string (nullable = true) |-- balancename: string (nullable = false) |-- chargedvalue: string (nullable = false) |-- newbalance: string (nullable = false)

我试过这个代码:

val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) => (xs, ys, zs).zipped.toSeq)

df.printSchema

val df4=df.withColumn("vars", explode(zip($"balancename", $"chargedvalue",$"newbalance"))).select(
$"owningcustomerid", $"event_stoptime",
$"vars._1".alias("balancename"), $"vars._2".alias("chargedvalue"),$"vars._2".alias("newbalance"))

我有个错误:

cannot resolve 'UDF(balancename, chargedvalue, newbalance)' due to data type mismatch: argument 1 requires array type, however, 'balancename' is of string type. argument 2 requires array type, however, 'chargedvalue' is of string type. argument 3 requires array type, however, 'newbalance' is of string type.;;

'项目[owningcustomerid#1085,事件ŧ停止时间ŧ1086,余额名称ŧ1159,chargedvalueŧ1160,newbalanceŧ1161,分解(udf(balancenameŧ1159,chargedvalueŧ1160,newbalanceŧ1161))作为变量ŧ1167]
eyh26e7m

eyh26e7m1#

在scala中,通常可以使用 Tuple3.zipped ```
val zip = udf((xs: Seq[Long], ys: Seq[Long], zs: Seq[Long]) =>
(xs, ys, zs).zipped.toSeq)

zip($"varA", $"varB", $"varC")

特别是在sparksql(>=2.4)中,您可以使用 `arrays_zip` 功能:

import org.apache.spark.sql.functions.arrays_zip

arrays_zip($"varA", $"varB", $"varC")

但是您必须注意,您的数据不包含 `array<string>` 但很简单 `strings` -因此Spark `arrays_zip` 或爆炸是不允许的,你应该先解析你的数据。

相关问题