在同一Dataframe上触发迭代

yxyvkwin  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(266)

我现在要处理的是用sparksql替换低频值。这意味着对于每个列,它将计算每个值的计数,如果它的计数低于我们给定的阈值(比如说6),我们将用我们指定的另一个值替换它。
通过sparksql和dataframe很难找到一种有效的方法来实现这一点,因为它涉及大量的洗牌和迭代。
下面显示的代码是我为测试其性能而实现的一种方法。然而,pyspark代码无法执行,因为转换链很长(其中有许多连接)。即使我没有启动一个操作,它也不能转换为rdd血统。它只是从来没有停止在一个互动jupyter笔记本细胞(我没有开始行动)。
有人能帮我分析一下原因吗?我能做些什么?

for col in DC_columns:
    NAN_VALUE = CAT_NAN_VALUE if 'C' in col else INT_NAN_VALUE
    value_counts = df.select(col).groupby(col).count().filter('count < 6').select(col)
    df_low = df.join(value_counts, col,'left_semi')
    df_high = df.join(value_counts, col,'left_anti')    
    df_low = df_low.withColumn(col, lit(NAN_VALUE))
    df = df_low.union(df_high)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题