我得到了这个函数:
def ead(lista):
ind_mmff, isdebala, isfubala, k1, k2, ead = lista
try:
isdebala = float(isdebala)
isfubala = float(isfubala)
k1 = float(k1)
k2 = float(k2)
ead = float(ead)
except ValueError:
return 'Error: invalid input'
min_deb = min(0, isdebala)
min_fub = min(0, isfubala)
if ind_mmff == '0':
ead_dai = abs(min_deb * k1 / 100 + min_fub * k2 / 100)
else:
ead_dai = ead
return ead_dai
然后,我定义一个UDF,如:
ead_udf = udf(lambda z: ead(z), FloatType())
目标是在我的df数据框中创建一个ead_calc
列,例如:
df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
执行df.select('ead_calc').show()
后,出现以下错误:
Py4JJavaError: An error occurred while calling o3026.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 813.0 failed 4 times, most recent failure: Lost task 3.3 in stage 813.0 (TID 12054, csslncclowp0006.unix.aacc.corp, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<ipython-input-93-25e605cffdae>", line 1, in <lambda>
File "<ipython-input-92-a1937fe32209>", line 12, in ead
TypeError: _() takes 1 positional argument but 2 were given
错误位于min_deb = min(0, isdebala)
。不知道如何解决这个问题,因为min函数显然需要2个参数。
目的是在我的df数据框中创建一个ead_calc列,例如:
df = df.withColumn('ead_calc', ead_udf (array(df.ind_mmff, df.isdebala, df.isfubala, df.k1, df.k2, df.ead_final_motor)))
1条答案
按热度按时间k97glaaz1#
我想你导入了错误的
min
函数,我猜你是通过使用from pyspark.sql.functions import *
从pyspark导入的,pyspark min函数只需要一个参数(列),但python函数需要两个参数尝试只导入所需的函数,它似乎工作(只是添加了一些随机输入)