我定义了一个字典和函数,并将一个udf注册为SQL函数
%%spark
d = {'I':'Ice', 'U':'UN', 'T':'Tick'}
def key_to_val(k):
if k in d:
return d[k]
else:
return "Null"
spark.udf.register('key_to_val', key_to_val,StringType())
我有一个Spark Dataframe
sdf =
+----+------------+--------------+
|id |date |Num |
+----+------------+--------------+
|I |2012-01-03 |1 |
|C |2013-01-11 |2 |
+----+------------+--------------+
我想应用我在sdf上注册的函数,将“id”中的值更改为字典值(如果字典存在的话),但是,我一直得到一个错误。
An error was encountered:
'list' object is not callable
Traceback (most recent call last):
TypeError: 'list' object is not callable
我试过的密码是
%%spark
sdf.withColumn('id', key_to_val(sdf.id))
预期输出为
+----+------------+--------------+
|id |date |Num |
+----+------------+--------------+
|Ice |2012-01-03 |1 |
|Null|2013-01-11 |2 |
+----+------------+--------------+
在尝试了代码之后
from pyspark.sql.functions import col, udf
key_to_val_udf = udf(key_to_val)
stocks_sdf.withColumn("org", key_to_val_udf(sdf.id)).show()
An error was encountered:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 200, in _batched
for item in iterator:
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/util.py", line 87, in wrapper
return f(*args, **kwargs)
File "<stdin>", line 2, in ticker_to_name
AttributeError: 'str' object has no attribute 'apply'
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
print(self._jdf.showString(n, 20, vertical))
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 200, in _batched
for item in iterator:
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/util.py", line 87, in wrapper
return f(*args, **kwargs)
File "<stdin>", line 2, in ticker_to_name
AttributeError: 'str' object has no attribute 'apply'
1条答案
按热度按时间jhdbpxl91#
您没有以正确的方式调用您的udf,要么注册一个udf,然后在.sql(“..”)查询中调用它,要么在函数上创建udf(),然后在.withColumn()中调用它,我修复了您的代码: