scipy 添加两个系列SparkPandas的UDF

xvw2m8pv  于 2023-03-18  发布在  Spark
关注(0)|答案(2)|浏览(118)

我的问题是尝试使用UDF将API与pyspark链接起来

columns = ['N','P']
data = [(1,3),
    (3,3),
    (5,3)]

df = spark.createDataFrame(data=data,schema=columns)

+---+---+
|N  |P  |
+---+---+
|1  |3  |
|3  |3  |
|5  |3  |
+---+---+

@pandas_udf("col1 int, col2 int")
def func(s1: pd.Series, s2: pd.Series) -> pd.Series:
    import scipy 
    
    s3 = s1 + s2*scipy.pi
    return s3

使用此函数后所需的输出是一个预先添加了转换的新列

df2 = df.withColumn('transformed',func("N", "P"))
9ceoxa92

9ceoxa921#

>>> from pyspark.sql.functions import pandas_udf
>>> from pyspark.sql.types import FloatType
>>> import pandas as pd
>>> 
>>> @pandas_udf(FloatType())
... def p_udf(s1: pd.Series, s2: pd.Series) -> pd.Series:
...   return s1 + (s2 * 3.14)
... 
>>> 
>>> df = spark.createDataFrame(data=[(1,3), (3,3), (5,3)], schema=['N','P'])
>>> df.withColumn('transformed', p_udf("N", "P")).show()
+---+---+-----------+
|  N|  P|transformed|
+---+---+-----------+
|  1|  3|      10.42|
|  3|  3|      12.42|
|  5|  3|      14.42|
+---+---+-----------+

>>>
xriantvc

xriantvc2#

@pandas_udf的参数是函数的返回类型,而不是输入参数的类型。
将代码更改为

from pyspark.sql import types as T

@F.pandas_udf(T.FloatType())
def func(s1: pd.Series, s2: pd.Series) -> pd.Series:
  [...]

应该行得通。

相关问题