Pandas-on-spark抛出java.lang.StackOverFlowError

gopyfrb3  于 2023-05-05  发布在  Spark
关注(0)|答案(1)|浏览(113)

我使用pandas-on-spark和regex来删除数据框中列的一些缩写。在pandas中,这一切都很好,但我的任务是将这些代码迁移到我们的spark集群上的生产工作负载,因此决定使用pandas-on-spark。但是,我遇到了一个奇怪的错误。我使用下面的函数来清理缩写。

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

然后,当我用pspd Series调用函数并执行操作时,查询计划将被执行:

df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))

它抛出一个java. lang. StackOverflowError。堆栈跟踪太长,无法粘贴到这里,我粘贴了它的子集,因为它是一个重复的。

java.lang.StackOverflowError
    at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:3099)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1641)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)

这样持续了很长一段时间,直到我得到:

23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
    poll(accum_updates)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
    if func():
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
    length = stream.read(4)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>

一些我尝试过的事情/我认为可能相关的事实:

  • 目前,我正在尝试在本地运行它。我在5000行数据的子集上本地运行它,所以这不应该是问题。也许增加某种默认配置仍然会有所帮助。
  • 我认为这与spark中的惰性求值有关,并且由于函数中的for循环,spark的DAG变得太大。但我不知道如何解决这个问题。根据pyspark-on-pandas最佳实践文档,我尝试实现检查点,但这不适用于pspd.Series,并且将我的Series转换为pspd.Dataframe会使.apply(lambda ...)在resolve_abbreviations函数中失败。

任何帮助将不胜感激。也许我最好避免使用pandas-on-spark API,并将代码转换为常规的pyspark,因为pandas-on-spark API显然还不够成熟,无法“按原样”运行pandas脚本?或者也许我们的代码设计本质上是有缺陷的,有另一种有效的方法来实现类似的结果?

r1zhe5dt

r1zhe5dt1#

你的输入数据有没有可能是深度嵌套的?这可能有助于您在这里看到的循环堆栈调用。
我会尝试的第一件事是使用比现在更大的堆栈大小运行。我不确定你运行的是什么操作系统/java版本,所以不知道你机器上的默认堆栈大小是多少。然而,通常,其范围在100 KB-1024 KB的数量级。
尝试使用4 MB的堆栈大小运行它。在JVM内部,这是通过Xss参数完成的。您需要在驱动程序上使用spark.driver.extraJavaOptions配置参数执行此操作。就像这样:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("whateverMasterYouHave")
     .setAppName("MyApp")
     .set("spark.driver.extraJavaOptions", "-Xss4M"))
sc = SparkContext.getOrCreate(conf = conf)

相关问题