如何在PYSPARK中进行并行处理

dl5txlt9  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(407)

我想使用pysppark在for循环中进行并行处理。

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]

for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

上面提到的脚本运行得很好,但我想在pysppark中进行并行处理,这在Scala中是可能的

moiiocjp

moiiocjp1#

Spark本身并行运行作业,但如果您仍然希望在代码中并行执行,您可以使用简单的Python代码进行并行处理(这只在Databricks上测试过link)。

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)

def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

我对您的代码做了一些修改,但这基本上是您运行并行任务的方式,如果您有一些想要并行运行的平面文件,只需列出它们的名称并将其传递到pool.map(Fun,Data)

根据需要更改函数FUN

有关多处理模块的更多详细信息,请查看documentation

同样,如果您想要在Scala中完成这项工作,则需要以下模块

import scala.concurrent.{Future, Await}

有关更详细的理解,请查看this。代码是为Databricks编写的,但稍作更改,它就可以在您的环境中运行。

相关问题