我使用pandas-on-spark和regex来删除数据框中列的一些缩写。在pandas中,这一切都很好,但我的任务是将这些代码迁移到我们的spark集群上的生产工作负载,因此决定使用pandas-on-spark。但是,我遇到了一个奇怪的错误。我使用下面的函数来清理缩写。
import pyspark.pandas as pspd
def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
"""
The job titles contain a lot of abbreviations for common terms.
We write them out to create a more standardized job title list.
:param job_list: df.SchoneFunctie during processing steps
:return: SchoneFunctie where abbreviations are written out in words
"""
abbreviations_dict = {
"1e": "eerste",
"1ste": "eerste",
"2e": "tweede",
"2de": "tweede",
"3e": "derde",
"3de": "derde",
"ceo": "chief executive officer",
"cfo": "chief financial officer",
"coo": "chief operating officer",
"cto": "chief technology officer",
"sr": "senior",
"tech": "technisch",
"zw": "zelfstandig werkend"
}
#Create a list of abbreviations
abbreviations_pob = list(abbreviations_dict.keys())
#For each abbreviation in this list
for abb in abbreviations_pob:
# define patterns to look for
patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
fr'{abb}\.']
# actual recoding of abbreviations to written out form
value_to_replace = abbreviations_dict[abb]
for patt in patterns:
job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)
return job_list
然后,当我用pspd Series调用函数并执行操作时,查询计划将被执行:
df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))
它抛出一个java. lang. StackOverflowError。堆栈跟踪太长,无法粘贴到这里,我粘贴了它的子集,因为它是一个重复的。
java.lang.StackOverflowError
at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:3099)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1641)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
这样持续了很长一段时间,直到我得到:
23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
self.process_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
self.finish_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
self.handle()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
poll(accum_updates)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
if func():
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
num_updates = read_int(self.rfile)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
length = stream.read(4)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>
一些我尝试过的事情/我认为可能相关的事实:
- 目前,我正在尝试在本地运行它。我在5000行数据的子集上本地运行它,所以这不应该是问题。也许增加某种默认配置仍然会有所帮助。
- 我认为这与spark中的惰性求值有关,并且由于函数中的for循环,spark的DAG变得太大。但我不知道如何解决这个问题。根据pyspark-on-pandas最佳实践文档,我尝试实现检查点,但这不适用于pspd.Series,并且将我的Series转换为
pspd.Dataframe
会使.apply(lambda ...)
在resolve_abbreviations函数中失败。
任何帮助将不胜感激。也许我最好避免使用pandas-on-spark API,并将代码转换为常规的pyspark,因为pandas-on-spark API显然还不够成熟,无法“按原样”运行pandas脚本?或者也许我们的代码设计本质上是有缺陷的,有另一种有效的方法来实现类似的结果?
1条答案
按热度按时间r1zhe5dt1#
你的输入数据有没有可能是深度嵌套的?这可能有助于您在这里看到的循环堆栈调用。
我会尝试的第一件事是使用比现在更大的堆栈大小运行。我不确定你运行的是什么操作系统/java版本,所以不知道你机器上的默认堆栈大小是多少。然而,通常,其范围在100 KB-1024 KB的数量级。
尝试使用4 MB的堆栈大小运行它。在JVM内部,这是通过
Xss
参数完成的。您需要在驱动程序上使用spark.driver.extraJavaOptions
配置参数执行此操作。就像这样: