pyspark net.razorvine.pickle.PickleException:在编写DataFrame时,ClassDict(对于numpy.dtype)的构造应使用零参数

fhity93d  于 2023-08-03  发布在  Spark
关注(0)|答案(1)|浏览(94)

我已经在运行于数据块上的PySpark代码中定义了以下函数。此函数使用.shp文件检索与数据集中的一组坐标相关的地区(outlet_geolocation)。

def get_outlet_location(outlets_geolocation: DataFrame):
    gdf = gpd.read_file(
        '/dbfs/mnt/path/to/file.shp'
    )

    def get_districts(lat, lon):
        point = Point(lon, lat)
        return gdf.loc[gdf.contains(point), "ADM2_EN"].squeeze()

    outlet_locations = (
        outlets_geolocation
        .filter(
            (f.col('latitude').isNotNull()) &
            (f.col('longitude').isNotNull()) &
            (f.col('longitude') != 0) &
            (f.col('latitude') != 0) &
            (f.col('is_gps_on') == True)
        )
        .dropDuplicates(['outlet_code'])
    )

    outlet_locations = outlet_locations.select(f.col('outlet_code'), f.col('latitude'), f.col('longitude'))

    get_districts_udf = udf(get_districts, StringType())

    master_outlets_geolocation = outlet_locations.withColumn('district', get_districts_udf('latitude', 'longitude'))

    return master_outlets_geolocation

字符串
这似乎工作正常。当我在返回的DataFrame上执行.head()时,我看到了我正在查找的记录。
但是,当我尝试将此数据集作为 parquet 文件写入我的数据湖(ADSL)时,我得到以下错误,

Py4JJavaError: An error occurred while calling o13792.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:606)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:360)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:198)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:590)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:168)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:590)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:566)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 892.0 failed 4 times, most recent failure: Lost task 0.3 in stage 892.0 (TID 12512) (10.168.5.135 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:610)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:457)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:336)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)
    at org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:133)
    at org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:147)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:90)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:437)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:444)
    ... 19 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3088)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3035)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3029)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3029)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1391)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1391)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1391)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3297)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3238)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3226)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1157)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2657)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2640)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:325)
    ... 43 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:610)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:457)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:336)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:95)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:110)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)
    at org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:133)
    at org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:147)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:90)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:437)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:444)
    ... 19 more


这到底是什么问题?它是否与我的数据类型有关?
我还看到了其他几个类似的问题:PySpark UDF for converting UTM error expected zero arguments for construction of ClassDict (for numpy.dtype)
但是,我似乎无法找出我的代码的确切问题。

kuhbmx9i

kuhbmx9i1#

我通过向get_districts()函数添加一些显式的类型转换来实现这一点,如下所示:

def get_districts(lat, lon):
    point = Point(float(lon), float(lat))
    return str(gdf.loc[gdf.contains(point), "ADM2_EN"].squeeze())

字符串
我根据上面链接的问题给出的答案进行了此更改。坦率地说,我不完全确定这是如何解决我的问题的,但我猜这与我的数据类型有关。

相关问题