我有一个Dataframe,有100列和列名,比如col1,col2,col3。。。。我想基于条件匹配对列的值应用某些转换。我可以将列名存储在字符串数组中。并将数组的每个元素的值传递到withcolumn中,并基于何时条件可以垂直变换列的值。但问题是,由于dataframe是不可变的,所以每个更新的版本都需要存储在一个新的变量中,而且新的dataframe需要传入withcolumn,以便为下一次迭代进行转换。是否有任何方法可以创建dataframe的数组,以便新的dataframe可以存储为array的元素,并且可以基于iterator的值进行迭代。或者有没有其他方法来处理同样的问题。
var arr_df : Array[DataFrame] = new Array[DataFrame](60)
-->这会引发错误“找不到Dataframe类型”
val df(0) = df1.union(df2)
for(i <- 1 to 99){
val df(i) = df(i-1).withColumn(col(i), when(col(i)> 0, col(i) +
1).otherwise(col(i)))
这里col(i)是一个字符串数组,用于存储原始datframe的列的名称。
例如:
scala> val original_df = Seq((1,2,3,4),(2,3,4,5),(3,4,5,6),(4,5,6,7),(5,6,7,8),(6,7,8,9)).toDF("col1","col2","col3","col4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]
scala> original_df.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
| 3| 4| 5| 6|
| 4| 5| 6| 7|
| 5| 6| 7| 8|
| 6| 7| 8| 9|
+----+----+----+----+
我想迭代3列:col1,col2,col3如果该列的值大于3,那么它将被+1更新
4条答案
按热度按时间nzrxty8p1#
如果我没听错的话,你是在尝试Dataframe操作。你不需要为此迭代。我可以告诉你怎么在Pypark里做。或许可以在斯卡拉接管。
结果:
这会给你想要的结果
rur96b6h2#
检查以下代码。
pw9qyyiw3#
您可以迭代所有列并在单行中应用条件,如下所示,
ivqmmu1c4#
你可以用
foldLeft
无论何时您想对多个columns
如下所示输出: