我想使用pysppark在for循环中进行并行处理。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
data = [a,b,c]
for i in data:
try:
df = spark.read.parquet('gs://'+i+'-data')
df.createOrReplaceTempView("people")
df2=spark.sql("""select * from people """)
df.show()
except Exception as e:
print(e)
continue
上面提到的脚本运行得很好,但我想在pysppark中进行并行处理,这在Scala中是可能的
1条答案
按热度按时间moiiocjp1#
Spark本身并行运行作业,但如果您仍然希望在代码中并行执行,您可以使用简单的Python代码进行并行处理(这只在Databricks上测试过link)。
我对您的代码做了一些修改,但这基本上是您运行并行任务的方式,如果您有一些想要并行运行的平面文件,只需列出它们的名称并将其传递到pool.map(Fun,Data)。
根据需要更改函数FUN。
有关多处理模块的更多详细信息,请查看documentation。
同样,如果您想要在Scala中完成这项工作,则需要以下模块
有关更详细的理解,请查看this。代码是为Databricks编写的,但稍作更改,它就可以在您的环境中运行。