scala—在每个列传递to do转换时遍历Dataframe

wvt8vs2t  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(518)

我有一个Dataframe,有100列和列名,比如col1,col2,col3。。。。我想基于条件匹配对列的值应用某些转换。我可以将列名存储在字符串数组中。并将数组的每个元素的值传递到withcolumn中,并基于何时条件可以垂直变换列的值。但问题是,由于dataframe是不可变的,所以每个更新的版本都需要存储在一个新的变量中,而且新的dataframe需要传入withcolumn,以便为下一次迭代进行转换。是否有任何方法可以创建dataframe的数组,以便新的dataframe可以存储为array的元素,并且可以基于iterator的值进行迭代。或者有没有其他方法来处理同样的问题。

var arr_df : Array[DataFrame] = new Array[DataFrame](60)

-->这会引发错误“找不到Dataframe类型”

val df(0) = df1.union(df2)

for(i <- 1 to 99){
  val df(i) = df(i-1).withColumn(col(i), when(col(i)> 0, col(i) + 
   1).otherwise(col(i)))

这里col(i)是一个字符串数组,用于存储原始datframe的列的名称。
例如:

scala> val original_df = Seq((1,2,3,4),(2,3,4,5),(3,4,5,6),(4,5,6,7),(5,6,7,8),(6,7,8,9)).toDF("col1","col2","col3","col4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]

scala> original_df.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   4|
|   2|   3|   4|   5|
|   3|   4|   5|   6|
|   4|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
+----+----+----+----+

我想迭代3列:col1,col2,col3如果该列的值大于3,那么它将被+1更新

nzrxty8p

nzrxty8p1#

如果我没听错的话,你是在尝试Dataframe操作。你不需要为此迭代。我可以告诉你怎么在Pypark里做。或许可以在斯卡拉接管。

from pyspark.sql import functions as F
tst= sqlContext.createDataFrame([(1,7,0),(1,8,4),(1,0,10),(5,1,90),(7,6,0),(0,3,11)],schema=['col1','col2','col3'])
expr = [F.when(F.col(coln)>3,F.col(coln)+1).otherwise(F.col(coln)).alias(coln) for coln in tst.columns if 'col3' not in coln]
tst1= tst.select(*expr)

结果:

tst1.show()
+----+----+
|col1|col2|
+----+----+
|   1|   8|
|   1|   9|
|   1|   0|
|   6|   1|
|   8|   7|
|   0|   3|
+----+----+

这会给你想要的结果

rur96b6h

rur96b6h2#

检查以下代码。

scala> df.show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |4   |5   |
|3   |4   |5   |6   |
|4   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
+----+----+----+----+
scala>  val requiredColumns = df.columns.zipWithIndex.filter(_._2 < 3).map(_._1).toSet
requiredColumns: scala.collection.immutable.Set[String] = Set(col1, col2, col3)
scala> val allColumns = df.columns
allColumns: Array[String] = Array(col1, col2, col3, col4)
scala> val columnExpr = allColumns.filterNot(requiredColumns(_)).map(col(_)) ++ requiredColumns.map(c => when(col(c) > 3, col(c) + 1).otherwise(col(c)).as(c))
scala> df.select(columnExpr:_*).show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |5   |5   |
|3   |5   |6   |6   |
|5   |6   |7   |7   |
|6   |7   |8   |8   |
|7   |8   |9   |9   |
+----+----+----+----+
pw9qyyiw

pw9qyyiw3#

您可以迭代所有列并在单行中应用条件,如下所示,

original_df.select(original_df.columns.map(c => (when(col(c) > lit(3), col(c)+1).otherwise(col(c))).alias(c)):_*).show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   5|
|   2|   3|   5|   6|
|   3|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
|   7|   8|   9|  10|
+----+----+----+----+
ivqmmu1c

ivqmmu1c4#

你可以用 foldLeft 无论何时您想对多个 columns 如下所示

val original_df = Seq(
  (1,2,3,4),
  (2,3,4,5),
  (3,4,5,6),
  (4,5,6,7),
  (5,6,7,8),
  (6,7,8,9)
).toDF("col1","col2","col3","col4")

//Filter the columns that yuou want to update
val columns = original_df.columns

columns.foldLeft(original_df){(acc, colName) =>
  acc.withColumn(colName, when(col(colName) > 3, col(colName) + 1).otherwise(col(colName)))
}
.show(false)

输出:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |5   |
|2   |3   |5   |6   |
|3   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
|7   |8   |9   |10  |
+----+----+----+----+

相关问题