PySpark -将Null值替换为相应行的平均值

mqkwyuun  于 2023-04-29  发布在  Spark
关注(0)|答案(3)|浏览(186)

我有以下pyspark dataframe:

df
col1 col2 col3
1      2    3
4    None   6
7      8   None

我想用它们所在行的平均值替换None(或Null)值。输出将如下所示:

df_result
col1 col2 col3
1      2    3
4      5    6
7      8   7.5

我尝试的所有操作都会导致错误“Column is not iterable”或“Invalid argument,not a string or column”。非常感谢您的帮助!

oaxa6hgo

oaxa6hgo1#

mapinPandas轻松解决
def fillna(iterator):
    for df in iterator:
        yield df.mask(df.isna(), df.mean(1), axis=0)

result = df.mapInPandas(fillna, ','.join(map('{} float'.format, df.columns)))

结果

result.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1.0| 2.0| 3.0|
| 4.0| 5.0| 6.0|
| 7.0| 8.0| 7.5|
+----+----+----+
col17t5w

col17t5w2#

Using only spark-in built functions:

尝试使用**Higher order array (aggregate)函数,计算数组中non null elements(using lambda functions)的个数。
然后计算mean of the elements
最后,divide
replace the nulls(case + when statement)**具有平均值。

Example:

df.show(10,False)
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1   |2   |3   |
#|4   |null|6   |
#|7   |8   |null|
#+----+----+----+

#add nulls_count filed to check how many null values are there in all the columns
#cast all columns as an array
#sum_elems as sum of all elements of array
#calculate the mean based on non null values
df1 = df.withColumn("nulls_count",size(filter(array(*[isnull(col(c)) for c in df.columns]), lambda x: x))).\
  withColumn("arr_vals",array(*[coalesce(col(c),lit(0)) for c in df.columns])).\
  withColumn("sum_elems",expr("aggregate(arr_vals,cast(0 as bigint),(acc, x) -> acc + x)")).\
  withColumn("mean_val",expr('round(sum_elems/((size(arr_vals))-nulls_count),1)'))

df1.select([when(col(c).isNull(), col("mean_val")).otherwise(col(c)).alias(c) for c in df.columns]).show(10,False)
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|1.0 |2.0 |3.0 |
#|4.0 |5.0 |6.0 |
#|7.0 |8.0 |7.5 |
#+----+----+----+
k10s72fa

k10s72fa3#

试试这个:

from pyspark.sql.functions import mean, col, when

# Compute the mean of each row
row_means = df.select(mean(col(c)).alias(c) for c in df.columns).collect()[0]

# Replace null values with row means
df_result = df.select([
    when(col(c).isNull(), row_means[c])
    .otherwise(col(c))
    .alias(c) for c in df.columns
])

相关问题