如何转换Scalar Pyspark UDF于Pandas UDF?

1l5u6lss  于 2023-10-14  发布在  Scala
关注(0)|答案(2)|浏览(126)

我有一个UDF,如下所示,这是一个普通的标量Pyspark UDF:

@udf()
def redact(colVal: column, offset: int = 0):
    if not colVal or not offset:
        return 'X'*8
    else:
        charList=list(colVal)
        charList[:-offset]='X'*(len(colVal)-offset)
        return "".join(charList)

当我试图将其转换为pandas_udf时,当我阅读时,使用矢量化的UDF代替标量UDF时,性能有了很大的提高,我遇到了很多与pandas相关的问题,而这些问题我经验不足。
请帮助我将此UDF转换为矢量化Pandas UDF

p1tboqfb

p1tboqfb1#

redact函数可以 Package 在一个函数中,该函数将redact应用于pd.Series的每一项。
需要应用Currying,因为要传递标量offset值。

from pyspark.sql import functions as F
import pandas as pd

def pandas_wrapper(values: pd.Series, offset: int) -> pd.Series:
    def redact(colVal: str, offset: int = 0):
        if not colVal or not offset:
            return 'X'*8
        else:
            charList=list(colVal)
            charList[:-offset]='X'*(len(colVal)-offset)
            return "".join(charList)
    return values.apply(lambda value: redact(value, offset))

def curried_wrapper(offset: int):
    return F.pandas_udf(lambda x: pandas_wrapper(x, offset), "string")

df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None,)], ("data_col", ))

df.withColumn("redacted", curried_wrapper(2)(F.col("data_col"))).show()

输出

+--------+--------+
|data_col|redacted|
+--------+--------+
|  abcdef|  XXXXef|
|    12yz|    XXyz|
|    null|XXXXXXXX|
+--------+--------+
zzwlnbp8

zzwlnbp82#

Nithish回复的重构版本:

from pyspark.sql import functions as F
import pandas as pd
from typing import Callable

def my_func(column_value: str) -> str:
    return column_value[:2] if column_value else None

def create_udf(f: Callable, *args, **kwargs):
    def pandas_wrapper(values: pd.Series) -> pd.Series:
        return values.apply(f)

    return F.pandas_udf(pandas_wrapper, *args, **kwargs)

df = spark.createDataFrame([("abcdef", ), ("12yz", ), (None, )], ("data_col", ))

df.withColumn("my_func", F.udf(my_func, "string")(F.col("data_col"))) \
  .withColumn("my_func_pandas", create_udf(my_func, "string")(F.col("data_col"))).show()

这个函数可以是任何东西,只要它只占用一个列。因此,在这种情况下,create_udfF.udf的直接替代品。
正如Nithish已经指出的那样,与Spark 3.5.0中引入的参数useArrow=True相比,序列化开销减少了。所以在Spark 3.5.0中不再需要这种代码,可以使用useArrow。

相关问题