f.regexp\u extract在udf中返回attributeerror:'nonetype'对象没有属性'\u jvm'

0mkxixxg  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(609)

我对spark和pyspark一窍不通。我有一个巨大的数据集,我有一组关键字,需要检查和提取从一列。
我的代码是这样的

temp_skills = ['sales', 'it', 'c']

@F.udf
def lookhere(z) -> str:
    strng = ' '
    for skill in temp_skills:
         strng += F.regexp_extract(z, skill, 0)
    return strng
spark.udf.register("lookhere", lambda z : lookhere(z), returnType=StringType())
DF.withColumn(
            'temp', 
            lookhere(DF.dept_name)
            ).show(truncate = False)

原始数据框:

+------------------+-------+
|         dept_name|dept_id|
+------------------+-------+
|  finance sales it|     10|
|marketing it sales|     20|
|             sales|     30|
|                it|     40|
+------------------+-------+

预期df:

+------------------+-------+----------+
|         dept_name|dept_id|      temp|
+------------------+-------+----------+
|  finance sales it|     10|sales it c|
|marketing it sales|     20| sales it |
|             sales|     30|   sales  |
|                it|     40|       it |
+------------------+-------+----------+

错误:

---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
<ipython-input-80-0c11f7327f77> in <module>()
      1 DF.withColumn('temp2', 
      2             lookintothis(DF.dept_name)
----> 3             ).show(truncate = False)

/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    440             print(self._jdf.showString(n, 20, vertical))
    441         else:
--> 442             print(self._jdf.showString(n, int(truncate), vertical))
    443 
    444     def __repr__(self):

/content/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

/content/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in raise_from(e)

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args,**kwargs)
  File "<ipython-input-75-31ef5eea3b75>", line 7, in lookintothis
  File "/content/spark-3.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1811, in regexp_extract
    jc = sc._jvm.functions.regexp_extract(_to_java_column(str), pattern, idx)
AttributeError: 'NoneType' object has no attribute '_jvm

环境:google colab windows 10 spark 3.0.0 pyspark 3.0.0
我的方法错了吗?或者我的语法?请帮我理解这个!

erhoui1w

erhoui1w1#

我已经查过了 skill 是在 dept_name 通过使用 in 声明。我认为你不需要更换什么东西。

temp_skills = ['sales', 'it', 'c']

from pyspark.sql.functions import *

@udf
def lookhere(z) -> str:

    strings = []
    for skill in temp_skills:
        if skill in z: strings.append(skill)
    return strings

spark.udf.register("lookhere", lookhere)

df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
df.withColumn('temp', lookhere('dept_name')).show(4, False)

+------------------+-------+--------------+
|dept_name         |dept_id|temp          |
+------------------+-------+--------------+
|finance sales it  |10     |[sales, it, c]|
|marketing it sales|20     |[sales, it]   |
|sales             |30     |[sales]       |
|it                |40     |[it]          |
+------------------+-------+--------------+

另一种方法是用dataframe方法,并通过拆分关键字来增加更精确的比较 dept_name .

temp_skills = ['sales', 'it', 'c']

from pyspark.sql.functions import *

df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
df.withColumn('dept_names', split('dept_name', ' ')) \
  .withColumn('skills', array(*map(lambda c: lit(c), temp_skills))) \
  .withColumn('temp', array_intersect('dept_names', 'skills')) \
  .drop('dept_names', 'skills').show(4, False)

+------------------+-------+-----------+
|dept_name         |dept_id|temp       |
+------------------+-------+-----------+
|finance sales it  |10     |[sales, it]|
|marketing it sales|20     |[it, sales]|
|sales             |30     |[sales]    |
|it                |40     |[it]       |
+------------------+-------+-----------+

相关问题