PySpark Map转换传递了一个函数错误原因:java.io.EOFException

tcomlyy6  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(488)

当我试图将一个函数传递给Spark RDD的map方法时遇到了一些麻烦。我的问题似乎是在函数中,但不确定。
我的函数是这样的:

def add_h3_hash_column(row):
    rowDict = row.asDict()
    hash = h3.geo_to_h3(
        rowDict["latitude"], rowDict["longitude"], resolution
    )
    rowDict[f"h3_hash_{res}"] = str(hash)
    return rowDict

def h3_hash_generator(spark: SparkSession, resolution, sdf: DataFrame) -> DataFrame:
    """Creates a new column in a DataFrame with the Hexagon hashes of the given resolution
    that map to a geographic point (latitude, longitude).

    :param resolution: the h3 resolution for the hexagons.
    :param df: DataFrame containing two columns named "latitude" and "longitude".
    :return: Returns the DataFrame with a new column with h3 hashes of desire resolution.
    """
    sdf_w_hash = sdf.rdd.map(add_h3_hash_column)
    sdf = spark.createDataFrame(sdf_w_hash)
    return sdf

我也尝试过其他方法,比如从add_h3_hash_column返回Row()对象,或者将函数简化为只返回(“Hello”),但仍然收到相同的错误。
在执行代码时,我收到了以下错误:

objc[54297]: +[__NSCFConstantString initialize] may have been in progress in another thread when fork() was called.
objc[54297]: +[__NSCFConstantString initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
22/10/28 13:47:48 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 102)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:539)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:657)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
        at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:642)
        ... 29 more
22/10/28 13:47:48 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 102) (ip-192-168-1-152.eu-west-1.compute.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:539)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:657)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
        at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:642)
        ... 29 more

22/10/28 13:47:48 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "partner_stores/runners/concept_runners/main_concepts.py", line 33, in <module>
    main(sys.argv[1:])
  File "partner_stores/runners/concept_runners/main_concepts.py", line 23, in main
    build_stg_h3_store_addresses(spark, args)
  File "/Users/danielteixeira/repositories/partner-data-mesh/data_products/partner_stores/partner_stores/runners/concept_runners/transformations/build_stg_h3_store_addresses.py", line 45, in build_stg_h3_store_addresses
    stg_h3_store_addresses = h3_hash_generator(
  File "/Users/danielteixeira/repositories/partner-data-mesh/data_products/partner_stores/partner_stores/utils/common.py", line 98, in h3_hash_generator
    sdf = spark.createDataFrame(sdf_w_hash)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 675, in createDataFrame
    return self._create_dataframe(data, schema, samplingRatio, verifySchema)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 698, in _create_dataframe
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 486, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 460, in _inferSchema
    first = rdd.first()
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/rdd.py", line 1586, in first
    rs = self.take(1)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/rdd.py", line 1566, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 1233, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a,**kw)
  File "/Users/danielteixeira/Library/Caches/pypoetry/virtualenvs/partner-stores-hpriuLoD-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 102) (ip-192-168-1-152.eu-west-1.compute.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:539)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:657)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
        at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:642)
        ... 29 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:539)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:657)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: java.io.EOFException
        at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
        at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:642)
        ... 29 more

但如果我不传递函数,它就会工作:

def h3_hash_generator(spark: SparkSession, resolution, sdf: DataFrame) -> DataFrame:
    sdf_w_hash = sdf.rdd.map(lambda x:
                             (x.id,
                              x.store_id,
                              h3.geo_to_h3(x.latitude, x.longitude, resolution)
                              ))
    sdf = spark.createDataFrame(sdf_w_hash)
    return sdf
sr4lhrrt

sr4lhrrt1#

函数add_h3_hash_column只是一个python函数(它与PySpark无关)。
执行此操作时:sdf.rdd.map(add_h3_hash_column),为RDD对象调用map函数,来自PySpark库。这是一个语法问题,因为map函数是为每条记录执行的,但它不知道上面的表达式中需要输入的参数。
第二种使用map函数的方法是在RDD上调用map函数的正确方法。
PySpark map(map())是一个RDD转换,用于在RDD/DataFrame的每个元素上应用转换函数(lambda),并返回一个新的RDD。
您刚刚编写的lambda expression意味着,对于每条记录x,您将创建冒号:后面的内容,在本例中,是一个包含3个元素的元组,分别是idstore_idgeo_to_h3(哈希值)。
你可以参考这个link。希望它能有所帮助。

相关问题