pyspark 调用udf对spark Dataframe 无效

vyu0f0g1  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(176)

我定义了一个字典和函数,并将一个udf注册为SQL函数

%%spark
d = {'I':'Ice', 'U':'UN', 'T':'Tick'}

def key_to_val(k):
  if k in d:
    return d[k]
  else:
    return "Null"

spark.udf.register('key_to_val', key_to_val,StringType())

我有一个Spark Dataframe

sdf = 
+----+------------+--------------+
|id  |date        |Num           |
+----+------------+--------------+
|I   |2012-01-03  |1             |
|C   |2013-01-11  |2             |
+----+------------+--------------+

我想应用我在sdf上注册的函数,将“id”中的值更改为字典值(如果字典存在的话),但是,我一直得到一个错误。

An error was encountered:
'list' object is not callable
Traceback (most recent call last):
TypeError: 'list' object is not callable

我试过的密码是

%%spark
sdf.withColumn('id', key_to_val(sdf.id))

预期输出为

+----+------------+--------------+
|id  |date        |Num           |
+----+------------+--------------+
|Ice |2012-01-03  |1             |
|Null|2013-01-11  |2             |
+----+------------+--------------+

在尝试了代码之后

from pyspark.sql.functions import col, udf
key_to_val_udf = udf(key_to_val)
stocks_sdf.withColumn("org", key_to_val_udf(sdf.id)).show()
An error was encountered:

  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/util.py", line 87, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in ticker_to_name
AttributeError: 'str' object has no attribute 'apply'

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/mnt1/yarn/usercache/livy/appcache/application_1678821388138_0001/container_1678821388138_0001_01_000008/pyspark.zip/pyspark/util.py", line 87, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in ticker_to_name
AttributeError: 'str' object has no attribute 'apply'
jhdbpxl9

jhdbpxl91#

您没有以正确的方式调用您的udf,要么注册一个udf,然后在.sql(“..”)查询中调用它,要么在函数上创建udf(),然后在.withColumn()中调用它,我修复了您的代码:

from pyspark.sql.functions import udf

d = {'I': 'Ice', 'U': 'UN', 'T': 'Tick'}

def key_to_val(k):
    if k in d:
        return d[k]
    else:
        return "Null"

key_to_val_udf = udf(key_to_val)

sdf = spark.createDataFrame([['I', '2012-01-03', 1], ['C', '2013-01-11', 2]], schema=['id', 'date', 'Num'])
sdf.withColumn('id', key_to_val_udf(sdf.id)).show()

+----+----------+---+
|  id|      date|Num|
+----+----------+---+
| Ice|2012-01-03|  1|
|Null|2013-01-11|  2|
+----+----------+---+

相关问题