如何将python函数转换为python pyspark

8tntrjer  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(517)

我目前在将python函数转换为python pyspark时遇到了一个问题,因为这两个函数都是不同的库。我想做的是拥有一个查询函数,然后将其应用回同一列。
这就是我为python pandas所做的工作(age是我试图从数据集中检索的列):

Age = [1, 3, -100, -99999,  39, 60, 87, 20,  21,  77777]

def clean_age(Age):
    if Age>=0 and Age<=95:
        return Age
    else:
        return np.nan

df['Age'] = df['Age'].apply(clean_age)

对于python pandas,它工作得很好,但现在我对python pyspark所做的就是这样,它不工作:

from pyspark.sql.types import IntegerType, IntegerType
from pyspark.sql.functions import udf

def clean_age(Age):
    if Age>=0 and Age<=95:
        return Age
    else:
        return NaN

spark.udf.register("clean_age", clean_age)
udf_myFunction = udf(clean_age, IntegerType())
new_df2 = new_df.withColumn('Age_Clean',udf_myFunction('Age'))
new_df2.show()

请建议我如何实现我从Pandas到Pypark的成果。提前谢谢!

gfttwv5a

gfttwv5a1#

创建自定义项:

from pyspark.sql.types import IntegerType
age_check_udf = udf(lambda age: age if (age >= 0 and age <= 95) else np.nan, IntegerType())

来自Dataframe的调用:

new_df2 = new_df.withColumn('Age_Clean', age_check_udf(new_df.Age))
new_df2.show()
ev7lccsx

ev7lccsx2#

你应该考虑使用 pandas_udf . 这是给你的 Spark >= 2.3.0 (尽管功能复杂,但这可能有点过分):

import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql.types import LongType

# your function, a and b are assumed to be type pd.Series

def my_func(a, b):
    return a * b

pandas_func = f.pandas_udf(my_func, returnType=LongType())

# create test dataframe

x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# apply pandas_func

df.select(pandas_func(f.col("x"), f.col("x"))).show()
+-----------------+
|pandas_func(x, x)|
+-----------------+
|                1|
|                4|
|                9|
+-----------------+

如果你想避免 pandas_udf ,任何版本的 Spark >= 2.0.0 可以利用 pyspark.sql.functions.when 以及 otherwise .

import pyspark.sql.functions as f

x = pd.Series([10, 777, -3, 22])
df = spark.createDataFrame(pd.DataFrame(x, columns=["Age"]))

df.withColumn(
  "Age",
  f.when(
    (f.col("Age") >= 0) & 
    (f.col("Age") <= 95), f.col("Age")).otherwise(f.lit(None))
)

请随意 Package df.withColumn 然后调用带参数的函数 df 又回来了 df.withColumn . 希望这有帮助。

相关问题