python 如何在pandas-on-spark API中运行迭代应用正则表达式的函数?

enyaitl3  于 2023-05-21  发布在  Python
关注(0)|答案(2)|浏览(219)

bounty还有3天到期。此问题的答案有资格获得+200声望奖励。Psychotechnopath正在寻找规范答案:由于许多人可能会使用应用正则表达式的for循环(我在这里使用的模式)来迁移pandas代码,因此我正在寻找这个问题的规范答案:这是一个常规的spark反模式,还是这个问题是由与pandas-on-spark相关的东西引起的?

我使用pandas-on-spark和regex来删除数据框中列的一些缩写。在pandas中,这一切都很好,但我的任务是将这些代码迁移到我们的spark集群上的生产工作负载,因此决定使用pandas-on-spark。我在使用下面的功能时遇到问题。我使用它来清理一些缩写(为了可读性,这里进行了一些简化,实际上abbreviations_dict有61个缩写,patterns是一个包含三个正则表达式模式的列表-因此对于循环迭代61 x3 = 183次迭代)。df["SchoneFunctie"]是大约420 k行的pyspark.pandas.Series。我在Azure Synapse中的Apache Spark池上运行此代码,Spark版本为3.3。(再具体一点:3.3.1.5.2-90111858)

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

问题和我尝试过的事情

作为per pandas-on-spark best practices docs,我试图在这个函数之后检查我的 Dataframe ,因为它是一个有一堆迭代的函数,所以血统可以很快变得很大。df.spark.explain()给出了373行的查询计划。请在下面找到它的一个片段:

*(186) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6074 AS SchoneFunctie#5881]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6073)#5878], [pythonUDF0#6074], 200
   +- *(185) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6073]
      +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6072)#5873], [pythonUDF0#6073], 200
         +- *(184) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6072]
            +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6071)#5868], [pythonUDF0#6072], 200
               +- *(183) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6071]
                  +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6070)#5863], [pythonUDF0#6071], 200
                     +- *(182) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6070]
                        +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6069)#5858], [pythonUDF0#6070], 200
                           +- *(181) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6069]
                              +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6068)#5853], [pythonUDF0#6069], 200
                                 +- *(180) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6068]
                                    +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6067)#5848], [pythonUDF0#6068], 200

然而,无论我尝试什么,我都不能成功地运行这个函数而不遇到错误。
只需调用resolve_abbreviations并尝试检查点

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.checkpoint()

导致以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [17], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1073, in SparkFrameMethods.checkpoint(self, eager)
   1070 from pyspark.pandas.frame import DataFrame
   1072 internal = self._psdf._internal.resolved_copy
-> 1073 checkpointed_sdf = internal.spark_frame.checkpoint(eager)
   1074 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:682, in DataFrame.checkpoint(self, eager)
    665 def checkpoint(self, eager: bool = True) -> "DataFrame":
    666     """Returns a checkpointed version of this :class:`DataFrame`. Checkpointing can be used to
    667     truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    668     iterative algorithms where the plan may grow exponentially. It will be saved to files
   (...)
    680     This API is experimental.
    681     """
--> 682     jdf = self._jdf.checkpoint(eager)
    683     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o8801.checkpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
    at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
    at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2403)
    at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:166)
    at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:60)
    at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
    at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
    at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:654)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

尝试使用local_checkpoint()而不是检查点

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.local_checkpoint()

导致类似的错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [21], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.local_checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1111, in SparkFrameMethods.local_checkpoint(self, eager)
   1108 from pyspark.pandas.frame import DataFrame
   1110 internal = self._psdf._internal.resolved_copy
-> 1111 checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
   1112 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:702, in DataFrame.localCheckpoint(self, eager)
    685 def localCheckpoint(self, eager: bool = True) -> "DataFrame":
    686     """Returns a locally checkpointed version of this :class:`DataFrame`. Checkpointing can be
    687     used to truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    688     iterative algorithms where the plan may grow exponentially. Local checkpoints are
   (...)
    700     This API is experimental.
    701     """
--> 702     jdf = self._jdf.localCheckpoint(eager)
    703     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o12529.localCheckpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
    at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
    at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
    at org.apache.spark.rdd.LocalRDDCheckpointData.doCheckpoint(LocalRDDCheckpointData.scala:54)
    at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
    at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
    at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
    at org.apache.spark.sql.Dataset.localCheckpoint(Dataset.scala:678)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

甚至当我试图通过调用一个动作来打破血统时

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
print(df.head(10))

我得到一个错误:

/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [23], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 print(df.head(10))

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12255, in DataFrame.__repr__(self)
  12252 if max_display_count is None:
  12253     return self._to_internal_pandas().to_string()
> 12255 pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  12256 pdf_length = len(pdf)
  12257 pdf = cast("DataFrame", pdf.iloc[:max_display_count])

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12246, in DataFrame._get_or_create_repr_pandas_cache(self, n)
  12243 def _get_or_create_repr_pandas_cache(self, n: int) -> Union[pd.DataFrame, pd.Series]:
  12244     if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  12245         object.__setattr__(
> 12246             self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  12247         )
  12248     return self._repr_pandas_cache[n]

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12241, in DataFrame._to_internal_pandas(self)
  12235 def _to_internal_pandas(self) -> pd.DataFrame:
  12236     """
  12237     Return a pandas DataFrame directly from _internal to avoid overhead of copy.
  12238 
  12239     This method is for internal use only.
  12240     """
> 12241     return self._internal.to_pandas_frame

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/utils.py:588, in lazy_property.<locals>.wrapped_lazy_property(self)
    584 @property
    585 @functools.wraps(fn)
    586 def wrapped_lazy_property(self):
    587     if not hasattr(self, attr_name):
--> 588         setattr(self, attr_name, fn(self))
    589     return getattr(self, attr_name)

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/internal.py:1056, in InternalFrame.to_pandas_frame(self)
   1054 """Return as pandas DataFrame."""
   1055 sdf = self.to_internal_spark_frame
-> 1056 pdf = sdf.toPandas()
   1057 if len(pdf) == 0 and len(sdf.schema) > 0:
   1058     pdf = pdf.astype(
   1059         {field.name: spark_type_to_pandas_dtype(field.dataType) for field in sdf.schema}
   1060     )

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:140, in PandasConversionMixin.toPandas(self)
    138 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
    139 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
--> 140 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
    141     split_batches=self_destruct
    142 )
    143 if len(batches) > 0:
    144     table = pyarrow.Table.from_batches(batches)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches)
    355         results = list(batch_stream)
    356 finally:
    357     # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 358     jsocket_auth_server.getResult()
    360 # Separate RecordBatches from batch order indices in results
    361 batches = results[:-1]

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o16336.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: The "collectAsArrowToPython" action failed. Please, fill a bug report in, and provide the full stack trace.
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:552)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:564)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3792)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3791)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
    at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.CollectLimitExec.doExecute(limit.scala:70)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226)
    at org.apache.spark.sql.Dataset.toArrowBatchRdd(Dataset.scala:3923)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3810)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3815)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3792)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    ... 19 more

我的问题

这里出了什么问题?我知道我的血统因为嵌套的for循环而变得很大。似乎我的任何操作都会使我的应用程序崩溃,但我不知道如何避免它。这也可能是一个pandas-on-spark的问题,我最好使用常规的pyspark。在这一点上,我相当卡住,所以任何有关解决这个问题的建议将不胜感激。

dojqjjoe

dojqjjoe1#

看起来你可能会遇到一种情况,在Spark上使用Pandas API不再合理。
事实上,对于查询计划中应该是单个Project阶段的东西,查询计划是如此庞大,这可能是有问题的。
你可以从这里走两条路:

  • 把你的函数分割成多个部分,在函数调用之间checkpoint你的 Dataframe 。这将使查询的性能更差,但它会在每个检查点操作之间切断您的血统。
  • 使用plain Pyspark即可。下面你可以找到一个例子:
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

def resolve_abbreviations(df, colName):
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            df = df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace))

    return df

df = spark.createDataFrame(
    [
        "Ik ben de 2e",
        "Jij bent de 1e geworden",
        "Ik wil de 3de zijn, maar ik ben de ceo",
        "Jij bent tech gezien de sr",
        "Later wil ik zw zijn"
    ],
    StringType()
)

你的职责和我的职责只有很小的不同:

  • 该函数接受2个参数,一个dataframe和一个String(表示一个列)
  • 我们实际上在执行regex替换的那一行

现在,如果我调用这个函数,我会得到翻译得很好的文本:

>>> resolve_abbreviations(df, "value").show(truncate=False)
+------------------------------------------------------------+
|value                                                       |
+------------------------------------------------------------+
|Ik ben de tweede                                            |
|Jij bent de eerste geworden                                 |
|Ik wil de derde zijn, maar ik ben de chief executive officer|
|Jij bent technisch gezien de senior                         |
|Later wil ik zelfstandig werkend zijn                       |
+------------------------------------------------------------+

如果您查看此操作的查询计划:

>>> resolve_abbreviations(df, "value").explain()
== Physical Plan ==
*(1) Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(reg
exp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(value#0, ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))1e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1e\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=
(\\))|(?<=(\()))1ste((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1ste\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), tweede, 1), 2e\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2de((?=( ))|(?=(\\))|(?=($))|(?=(\))
)), tweede, 1), 2de\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3e\., derde, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3de((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3de\., derde, 1), ((?<=( ))|(?<=(^))|(?<=
(\\))|(?<=(\()))ceo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief executive officer, 1), ceo\., chief executive officer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))cfo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief financial officer, 1), cfo\., chief financial officer, 1), ((?<=( 
))|(?<=(^))|(?<=(\\))|(?<=(\()))coo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief operating officer, 1), coo\., chief operating officer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))cto((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief technology officer, 1), cto\., chief technology of
ficer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))sr((?=( ))|(?=(\\))|(?=($))|(?=(\)))), senior, 1), sr\., senior, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))tech((?=( ))|(?=(\\))|(?=($))|(?=(\)))), technisch, 1), tech\., technisch, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\
()))zw((?=( ))|(?=(\\))|(?=($))|(?=(\)))), zelfstandig werkend, 1), zw\., zelfstandig werkend, 1) AS value#275]
+- *(1) Scan ExistingRDD[value#0]

您可以看到,尽管您正在执行许多regex替换,但它们都是逻辑计划中单个Project步骤的一部分。这也将使您的操作更加高效。

hmae6n7t

hmae6n7t2#

这里出了什么问题?我知道我的血统因为嵌套的for循环而变得很大。似乎我的任何操作都会使我的应用程序崩溃,但我不知道如何避免它。这也可能是一个pandas-on-spark的问题,我最好使用常规的pyspark。在这一点上,我相当卡住,所以任何有关解决这个问题的建议将不胜感激。
错误提示:SparkContext正在关闭。另一个错误是当你从Arrow收集到Python时:

Caused by: org.apache.spark.SparkException: The "collectAsArrowToPython"

什么会导致这种情况?可能是内存不足和/或超时,因为操作数量太大。使用Python UDF,其中代码显式地在工作线程上以Python运行,这可能是一种更好的方法,因为它大大简化了执行计划。
例如,这个函数(resolve_abbreviations)在运行之前需要是一个UDF(你也不应该给你正在操作的 Dataframe 赋值,以减少混乱):

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])

文档中有一个系列的udf示例(使用transform()apply()

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return pser + 1  # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)

为了提高性能,您可能需要使用transform_batch和/或apply_batch方法来并行化操作。

相关问题