pyspark rdd mappartitions调用失败,代码为异步

vuktfyat  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(317)

我有一些pyspark代码,类似这样:

my_dataframe.rdd.mapPartitions(
  build_data_from_partition
)

哪里 build_data_from_partition 函数有点像

def build_data_from_partition(partition_entries):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(get_data_asynchronously(partition_entries))
    yield from result

在那里, get_data_asynchronously 是一个 async 函数,例如使用asyncio

async def get_data_asynchronously(entries):
    # make & await web requests,
    # parse & return the results

然而,似乎 mapPartitions step通过javagw导致了一些相当严重的酸洗错误。我得到一个stacktrace,看起来像这样:

Driver stacktrace:                                                                                                                                                                                                                            
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)                                                                                              
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)                                                                                                                                       
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)                                                                                                                                       
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)                                                                                                                                                     
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)                                          
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)                                                                                                                                                        
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)                                                                                                                               
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)                                                                                                                               
        at scala.Option.foreach(Option.scala:257)                                                                      
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)                                                                                                                                                
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)                                                                                                                                       
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)                                                                                                                                         
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)                                                                                                                                         
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)                                             
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)                                                                                                                                                             
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)                                               
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)                                               
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)                                               
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)                                                                                                                                                          
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)                                                                                                                                                     
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)                                                                                                                                     
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)                                                                                                                                                             
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)                                                                                                                                                             
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)                                          
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)                                                                                                                             
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)                                                                                                                                         
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)                                                                                                                                             
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)                                                 
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)                                                       
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)                                                       
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)                                                     
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)                                                  
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                                                                                      
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                                                                                              
        at java.lang.reflect.Method.invoke(Method.java:498)                                                            
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)                                                
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)                                          
        at py4j.Gateway.invoke(Gateway.java:282)                                                                       
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)                                                                                                                                                               
        at py4j.commands.CallCommand.execute(CallCommand.java:79)                                                      
        at py4j.GatewayConnection.run(GatewayConnection.java:238)                                                      
        at java.lang.Thread.run(Thread.java:748)                                                                       
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):                                                                                                                                                    
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main                                                                                                                                                               
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)                                                                                                                                                                
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command                                                                                                                                                        
    command = serializer._read_with_length(file)                                                                       
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length                                                                                                                                             
    return self.loads(obj)                                 
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads                                                                                                                                                         
    return pickle.loads(obj, encoding=encoding)                                                                        
ModuleNotFoundError: No module named 'function'

我真的不知所措-我已经尝试了所有的方法,从为低阶函数()带来函数定义 get_data_asynchronously )进入高阶函数( build_data_from_partition ),因此我可以完全避免对函数进行酸洗,但这不起作用。
我尝试从异步函数中删除所有逻辑(同时仍将其定义为异步)并简单地返回一个空数组,但仍然收到相同的错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题