覆盖sparkDataframe架构

w3nuxt5m  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(370)

后期编辑:基于本文,spark似乎无法编辑rdd或列。必须创建一个新类型并删除旧类型。下面建议的for循环和.withcolumn方法似乎是完成工作的最简单方法。
原始问题:有没有一种简单的方法(对于人和机器)将多个列转换为不同的数据类型?
我尝试手动定义模式,然后使用此模式从Parquet文件加载数据并将其保存到另一个文件,但每次都会出现“作业中止”…“写入行时任务失败”的情况。对我来说有点容易,对spark来说很辛苦。。。但它不起作用。
另一种选择是使用:

df = df.withColumn("new_col", df("old_col").cast(type)).drop("old_col").withColumnRenamed("new_col", "old_col")

我需要做更多的工作,因为有将近100列,如果spark必须在内存中复制每一列,那么这听起来也不是最优的。有没有更简单的方法?

bfnvny8b

bfnvny8b1#

根据强制转换规则的复杂程度,您可以通过以下循环来完成所要求的任务:

scala> var df = Seq((1,2),(3,4)).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala> df.show
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> > df.columns.foreach{c => df = df.withColumn(c, df(c).cast(DoubleType))}

scala> df.show
+---+---+
|  a|  b|
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+

这应该和任何其他列操作一样有效。

相关问题