pySpark UDF函数中的TypeError

krugob8w  于 2023-06-06  发布在  Apache
关注(0)|答案(1)|浏览(98)

我得到了这个函数:

def ead(lista):
    ind_mmff, isdebala, isfubala, k1, k2, ead = lista
    try:
        isdebala = float(isdebala)
        isfubala = float(isfubala)
        k1 = float(k1)
        k2 = float(k2)
        ead = float(ead)
    except ValueError:
        return 'Error: invalid input'        
    min_deb = min(0, isdebala)
    min_fub = min(0, isfubala)
    
    if ind_mmff == '0':
        ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
    else:
        ead_dai = ead
    return ead_dai

然后,我定义一个UDF,如:

ead_udf = udf(lambda z: ead(z), FloatType())

目标是在我的df数据框中创建一个ead_calc列,例如:

df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))

执行df.select('ead_calc').show()后,出现以下错误:

Py4JJavaError: An error occurred while calling o3026.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 813.0 failed 4 times, most recent failure: Lost task 3.3 in stage 813.0 (TID 12054, csslncclowp0006.unix.aacc.corp, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-93-25e605cffdae>", line 1, in <lambda>
  File "<ipython-input-92-a1937fe32209>", line 12, in ead
TypeError: _() takes 1 positional argument but 2 were given

错误位于min_deb = min(0, isdebala)。不知道如何解决这个问题,因为min函数显然需要2个参数。
目的是在我的df数据框中创建一个ead_calc列,例如:

df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
k97glaaz

k97glaaz1#

我想你导入了错误的min函数,我猜你是通过使用from pyspark.sql.functions import *从pyspark导入的,pyspark min函数只需要一个参数(列),但python函数需要两个参数
尝试只导入所需的函数,它似乎工作(只是添加了一些随机输入)

from pyspark.sql.functions import udf, array

from pyspark.sql.types import StructField, StructType, FloatType

def ead(lista):
    ind_mmff, isdebala, isfubala, k1, k2, ead = lista
    try:
        isdebala = float(isdebala)
        isfubala = float(isfubala)
        k1 = float(k1)
        k2 = float(k2)
        ead = float(ead)
    except ValueError:
        return 'Error: invalid input'        
    min_deb = min(0, isdebala)
    min_fub = min(0, isfubala)
    
    if ind_mmff == '0':
        ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
    else:
        ead_dai = ead
    return ead_dai

ead_udf = udf(lambda z: ead(z), FloatType())

schema = StructType([
  StructField('ind_mmff', FloatType(), True),
  StructField('isdebala', FloatType(), True),
  StructField('isfubala', FloatType(), True),
  StructField('k1', FloatType(), True),
  StructField('k2', FloatType(), True),
  StructField('ead_final_motor', FloatType(), True)
  ])

df = spark.createDataFrame(data=[(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)],schema=schema)

df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))

df.show()
+--------+--------+--------+---+---+---------------+--------+                   
|ind_mmff|isdebala|isfubala| k1| k2|ead_final_motor|ead_calc|
+--------+--------+--------+---+---+---------------+--------+
|     1.0|     2.0|     3.0|4.0|5.0|            6.0|     6.0|
+--------+--------+--------+---+---+---------------+--------+

相关问题