bounty还有3天到期。此问题的答案有资格获得+200声望奖励。Psychotechnopath正在寻找规范答案:由于许多人可能会使用应用正则表达式的for循环(我在这里使用的模式)来迁移pandas代码,因此我正在寻找这个问题的规范答案:这是一个常规的spark反模式,还是这个问题是由与pandas-on-spark相关的东西引起的?
我使用pandas-on-spark和regex来删除数据框中列的一些缩写。在pandas中,这一切都很好,但我的任务是将这些代码迁移到我们的spark集群上的生产工作负载,因此决定使用pandas-on-spark。我在使用下面的功能时遇到问题。我使用它来清理一些缩写(为了可读性,这里进行了一些简化,实际上abbreviations_dict
有61个缩写,patterns
是一个包含三个正则表达式模式的列表-因此对于循环迭代61 x3 = 183次迭代)。df["SchoneFunctie"]
是大约420 k行的pyspark.pandas.Series
。我在Azure Synapse中的Apache Spark池上运行此代码,Spark版本为3.3。(再具体一点:3.3.1.5.2-90111858)
import pyspark.pandas as pspd
def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
"""
The job titles contain a lot of abbreviations for common terms.
We write them out to create a more standardized job title list.
:param job_list: df.SchoneFunctie during processing steps
:return: SchoneFunctie where abbreviations are written out in words
"""
abbreviations_dict = {
"1e": "eerste",
"1ste": "eerste",
"2e": "tweede",
"2de": "tweede",
"3e": "derde",
"3de": "derde",
"ceo": "chief executive officer",
"cfo": "chief financial officer",
"coo": "chief operating officer",
"cto": "chief technology officer",
"sr": "senior",
"tech": "technisch",
"zw": "zelfstandig werkend"
}
#Create a list of abbreviations
abbreviations_pob = list(abbreviations_dict.keys())
#For each abbreviation in this list
for abb in abbreviations_pob:
# define patterns to look for
patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
fr'{abb}\.']
# actual recoding of abbreviations to written out form
value_to_replace = abbreviations_dict[abb]
for patt in patterns:
job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)
return job_list
问题和我尝试过的事情:
作为per pandas-on-spark best practices docs,我试图在这个函数之后检查我的 Dataframe ,因为它是一个有一堆迭代的函数,所以血统可以很快变得很大。df.spark.explain()
给出了373行的查询计划。请在下面找到它的一个片段:
*(186) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6074 AS SchoneFunctie#5881]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6073)#5878], [pythonUDF0#6074], 200
+- *(185) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6073]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6072)#5873], [pythonUDF0#6073], 200
+- *(184) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6072]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6071)#5868], [pythonUDF0#6072], 200
+- *(183) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6071]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6070)#5863], [pythonUDF0#6071], 200
+- *(182) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6070]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6069)#5858], [pythonUDF0#6070], 200
+- *(181) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6069]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6068)#5853], [pythonUDF0#6069], 200
+- *(180) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6068]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6067)#5848], [pythonUDF0#6068], 200
然而,无论我尝试什么,我都不能成功地运行这个函数而不遇到错误。
只需调用resolve_abbreviations
并尝试检查点
df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.checkpoint()
导致以下错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In [17], line 2
1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.checkpoint()
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1073, in SparkFrameMethods.checkpoint(self, eager)
1070 from pyspark.pandas.frame import DataFrame
1072 internal = self._psdf._internal.resolved_copy
-> 1073 checkpointed_sdf = internal.spark_frame.checkpoint(eager)
1074 return DataFrame(internal.with_new_sdf(checkpointed_sdf))
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:682, in DataFrame.checkpoint(self, eager)
665 def checkpoint(self, eager: bool = True) -> "DataFrame":
666 """Returns a checkpointed version of this :class:`DataFrame`. Checkpointing can be used to
667 truncate the logical plan of this :class:`DataFrame`, which is especially useful in
668 iterative algorithms where the plan may grow exponentially. It will be saved to files
(...)
680 This API is experimental.
681 """
--> 682 jdf = self._jdf.checkpoint(eager)
683 return DataFrame(jdf, self.sparkSession)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o8801.checkpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2403)
at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:166)
at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:60)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:654)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
尝试使用local_checkpoint()
而不是检查点
df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.local_checkpoint()
导致类似的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In [21], line 2
1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.local_checkpoint()
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1111, in SparkFrameMethods.local_checkpoint(self, eager)
1108 from pyspark.pandas.frame import DataFrame
1110 internal = self._psdf._internal.resolved_copy
-> 1111 checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
1112 return DataFrame(internal.with_new_sdf(checkpointed_sdf))
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:702, in DataFrame.localCheckpoint(self, eager)
685 def localCheckpoint(self, eager: bool = True) -> "DataFrame":
686 """Returns a locally checkpointed version of this :class:`DataFrame`. Checkpointing can be
687 used to truncate the logical plan of this :class:`DataFrame`, which is especially useful in
688 iterative algorithms where the plan may grow exponentially. Local checkpoints are
(...)
700 This API is experimental.
701 """
--> 702 jdf = self._jdf.localCheckpoint(eager)
703 return DataFrame(jdf, self.sparkSession)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o12529.localCheckpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
at org.apache.spark.rdd.LocalRDDCheckpointData.doCheckpoint(LocalRDDCheckpointData.scala:54)
at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
at org.apache.spark.sql.Dataset.localCheckpoint(Dataset.scala:678)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
甚至当我试图通过调用一个动作来打破血统时
df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
print(df.head(10))
我得到一个错误:
/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In [23], line 2
1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 print(df.head(10))
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12255, in DataFrame.__repr__(self)
12252 if max_display_count is None:
12253 return self._to_internal_pandas().to_string()
> 12255 pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
12256 pdf_length = len(pdf)
12257 pdf = cast("DataFrame", pdf.iloc[:max_display_count])
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12246, in DataFrame._get_or_create_repr_pandas_cache(self, n)
12243 def _get_or_create_repr_pandas_cache(self, n: int) -> Union[pd.DataFrame, pd.Series]:
12244 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
12245 object.__setattr__(
> 12246 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
12247 )
12248 return self._repr_pandas_cache[n]
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12241, in DataFrame._to_internal_pandas(self)
12235 def _to_internal_pandas(self) -> pd.DataFrame:
12236 """
12237 Return a pandas DataFrame directly from _internal to avoid overhead of copy.
12238
12239 This method is for internal use only.
12240 """
> 12241 return self._internal.to_pandas_frame
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/utils.py:588, in lazy_property.<locals>.wrapped_lazy_property(self)
584 @property
585 @functools.wraps(fn)
586 def wrapped_lazy_property(self):
587 if not hasattr(self, attr_name):
--> 588 setattr(self, attr_name, fn(self))
589 return getattr(self, attr_name)
File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/internal.py:1056, in InternalFrame.to_pandas_frame(self)
1054 """Return as pandas DataFrame."""
1055 sdf = self.to_internal_spark_frame
-> 1056 pdf = sdf.toPandas()
1057 if len(pdf) == 0 and len(sdf.schema) > 0:
1058 pdf = pdf.astype(
1059 {field.name: spark_type_to_pandas_dtype(field.dataType) for field in sdf.schema}
1060 )
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:140, in PandasConversionMixin.toPandas(self)
138 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
139 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
--> 140 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
141 split_batches=self_destruct
142 )
143 if len(batches) > 0:
144 table = pyarrow.Table.from_batches(batches)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches)
355 results = list(batch_stream)
356 finally:
357 # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 358 jsocket_auth_server.getResult()
360 # Separate RecordBatches from batch order indices in results
361 batches = results[:-1]
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o16336.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: The "collectAsArrowToPython" action failed. Please, fill a bug report in, and provide the full stack trace.
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:552)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:564)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3792)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3791)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
at org.apache.spark.sql.execution.CollectLimitExec.doExecute(limit.scala:70)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226)
at org.apache.spark.sql.Dataset.toArrowBatchRdd(Dataset.scala:3923)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3810)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3815)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3792)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
... 19 more
我的问题
这里出了什么问题?我知道我的血统因为嵌套的for循环而变得很大。似乎我的任何操作都会使我的应用程序崩溃,但我不知道如何避免它。这也可能是一个pandas-on-spark的问题,我最好使用常规的pyspark。在这一点上,我相当卡住,所以任何有关解决这个问题的建议将不胜感激。
2条答案
按热度按时间dojqjjoe1#
看起来你可能会遇到一种情况,在Spark上使用Pandas API不再合理。
事实上,对于查询计划中应该是单个
Project
阶段的东西,查询计划是如此庞大,这可能是有问题的。你可以从这里走两条路:
你的职责和我的职责只有很小的不同:
现在,如果我调用这个函数,我会得到翻译得很好的文本:
如果您查看此操作的查询计划:
您可以看到,尽管您正在执行许多regex替换,但它们都是逻辑计划中单个
Project
步骤的一部分。这也将使您的操作更加高效。hmae6n7t2#
这里出了什么问题?我知道我的血统因为嵌套的for循环而变得很大。似乎我的任何操作都会使我的应用程序崩溃,但我不知道如何避免它。这也可能是一个pandas-on-spark的问题,我最好使用常规的pyspark。在这一点上,我相当卡住,所以任何有关解决这个问题的建议将不胜感激。
错误提示:
SparkContext
正在关闭。另一个错误是当你从Arrow收集到Python时:什么会导致这种情况?可能是内存不足和/或超时,因为操作数量太大。使用Python UDF,其中代码显式地在工作线程上以Python运行,这可能是一种更好的方法,因为它大大简化了执行计划。
例如,这个函数(
resolve_abbreviations
)在运行之前需要是一个UDF(你也不应该给你正在操作的 Dataframe 赋值,以减少混乱):文档中有一个系列的udf示例(使用
transform()
或apply()
:为了提高性能,您可能需要使用
transform_batch
和/或apply_batch
方法来并行化操作。