如何在pyspark中找到列表中出现频率最高的元素?

6ju8rftf  于 2022-11-21  发布在  Spark
关注(0)|答案(2)|浏览(207)

我有一个pyspark Dataframe ,它有两个列,ID和Elements。列“Elements”中有列表元素。它看起来像这样,

ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]

我想用列'Elements'中最常用的元素组成一个列。输出应该如下所示,

ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown

如何使用pyspark实现这一点?
先谢谢你。

j2datikz

j2datikz1#

我们可以在这里使用高阶函数(可从spark 2.4+获得)
1.首先使用transformaggregate获取数组中每个非重复值的计数。
1.然后以降序方式对结构体数组进行排序,然后获取第一个元素。

from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
              .withColumn("Counts",F.expr("""transform(Dist,x->
                           aggregate(Elements,0,(acc,y)-> IF (y=x, acc+1,acc))
                                      )"""))
              .withColumn("Map",F.arrays_zip("Dist","Counts")
              )).drop("Dist","Counts")
out = temp.withColumn("Output_column",
                    F.expr("""element_at(array_sort(Map,(first,second)->
         CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))

输出:

  • 请注意,我已经为ID z添加了一个空白数组以进行测试。您还可以通过将.drop("Map")添加到输出中来删除Map列 *
out.show(truncate=False)

+---+---------------------------------------------+--------------------------------------+---------------+
|ID |Elements                                     |Map                                   |Output_column  |
+---+---------------------------------------------+--------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
+---+---------------------------------------------+--------------------------------------+---------------+

对于较低版本,可以使用udf和统计模式:

from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())

df.withColumn("Output",u("Elements")).show(truncate=False)
+---+---------------------------------------------+---------------+
|ID |Elements                                     |Output         |
+---+---------------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
+---+---------------------------------------------+---------------+
czq61nw1

czq61nw12#

你 可以 使用 pyspark sql 函数 来 实现 ( spark 2.4 + ) 。 下面 是 一 个 通用 函数 , 它 在 另 一 个 数组 列 中 添加 一 个 包含 最 常见 元素 的 新 列 。

import pyspark.sql.functions as sf

def add_most_common_val_in_array(df, arraycol, drop=False):
    """Takes a spark df column of ArrayType() and returns the most common element
    in the array in a new column of the df called f"MostCommon_{arraycol}"

    Args:
        df (spark.DataFrame): dataframe
        arraycol (ArrayType()): array column in which you want to find the most common element
        drop (bool, optional): Drop the arraycol after finding most common element. Defaults to False.

    Returns:
        spark.DataFrame: df with additional column containing most common element in arraycol
    """
    dvals = f"distinct_{arraycol}"
    dvalscount = f"distinct_{arraycol}_count"
    startcols = df.columns
    df = df.withColumn(dvals, sf.array_distinct(arraycol))
    df = df.withColumn(
        dvalscount,
        sf.transform(
            dvals,
            lambda uval: sf.aggregate(
                arraycol,
                sf.lit(0),
                lambda acc, entry: sf.when(entry == uval, acc + 1).otherwise(acc),
            ),
        ),
    )
    countercol = f"ReverseCounter{arraycol}"
    df = df.withColumn(countercol, sf.map_from_arrays(dvalscount, dvals))
    mccol = f"MostCommon_{arraycol}"
    df = df.withColumn(mccol, sf.element_at(countercol, sf.array_max(dvalscount)))
    df = df.select(*startcols, mccol)
    if drop:
        df = df.drop(arraycol)
    return df

中 的 每 一 个

相关问题