如何使用pyspark并行化我的文件处理程序

ioekq8ef  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(721)

我现在有一个大型python项目,其中驱动程序有一个函数,它使用for循环遍历gcp(googlecloudplatform)bucket上的每个文件。我使用cli将作业提交到gcp,并让作业在gcp上运行。
对于在这个for循环中遍历的每个文件,我将调用一个函数parse_file(…),该函数解析该文件并调用处理该文件的一系列其他函数。
整个项目运行了几分钟,速度很慢,而且驱动程序还没有使用太多pyspark。问题是该文件级for循环中的每个parse_文件(…)都是按顺序执行的。有没有可能使用pyspark来并行化文件级for循环,以便对所有这些文件并行运行parse_file(…)函数,从而减少程序执行时间并提高效率?如果是这样的话,既然程序没有使用pyspark,那么是否需要进行大量的代码修改才能使其并行化?
程序的功能是这样的


# ... some other codes

attributes_table = ....
for obj in gcp_bucket.objects(path):
    if obj.key.endswith('sys_data.txt'):
        #....some other codes
        file_data = (d for d in obj.download().decode('utf-8').split('\n'))
        parse_file(file_data, attributes_table)
        #....some other codes ....

如何使用pyspark来并行化这个部分,而不是一次使用一个for循环遍历文件?

vwhgwdsa

vwhgwdsa1#

谢谢你的提问。
我建议根据您的需求创建rdd gcp_bucket.objects(path) .
您拥有sparkcontext,因此创建rdd应该非常简单: my_rdd = sc.parallelize(gcp_bucket.objects(path) .
对于未初始化的用户,约定是将sparkcontext赋值给变量 sc . for循环的内容必须放入函数中,我们调用它 my_function . 你现在拥有了你的全部。
下一步将Map函数:

results_dag = my_rdd.map(my_function)
results = results_dag.collect()

回想一下spark执行的是惰性评估。这就是为什么我们需要执行 collect 最后的行动。
其他一些建议。第一种方法是在gcp bucket中的一小组对象上运行代码。了解时间安排。为了促进良好的编码实践,另一个建议是考虑将for循环中的操作进一步分解为额外的rdd。你可以把它们锁在一起。。。

my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()

相关问题