我是spark的新手,并开始用python编写一些脚本。我的理解是spark并行执行转换(map)。
def some_function(name, content):
print(name, datetime.now())
time.sleep(30)
return content
config = SparkConf().setAppName("sample2").setMaster("local[*]")
filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")
inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))
print(inputfileRDD.collect())
上面的代码收集 .zip
文件夹中的文件并对其进行处理。当我执行它时,我看到这是按顺序发生的。
输出
file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085
file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317
您可以看到它在30秒后开始处理第二个文件。意思是在完成第一个文件之后。我的代码出了什么问题?为什么不并行执行rdd?你能帮帮我吗?
1条答案
按热度按时间oalqel3c1#
我不知道这个方法
binaryFiles
将文件按spark分区进行分区。似乎恰恰相反textFiles
,它倾向于只创建一个分区。让我们通过一个名为dir
包含5个文件。如果我使用
textFile
,事情是并行的。我不提供输出,因为它不是很漂亮,但你可以检查自己。我们可以证实事情是并行运行的getNumPartitions
.与
binaryFiles
事情是不同的,出于某种原因,所有的东西都被放在同一个分区。我甚至试过处理10k文件,但所有的文件仍然放在同一个分区中。我相信这背后的原因是在斯卡拉,
binaryFiles
返回带有文件名的rdd和允许读取文件(但不执行读取)的对象。因此它是快速的,并且产生的rdd很小。因此,将它放在一个分区上是可以的。因此,在scala中,我们可以在使用binaryFiles
一切都会很顺利的。python的问题是
binaryFiles
实际上是将文件读入一个分区。另外,这对我来说是非常神秘的,但是pyspark2.4中的以下代码行产生了您注意到的相同行为,这是没有意义的。然而,自从
binaryFiles
实际读取文件时,可以使用wholeTextFile
它将文件读取为文本文件并按预期运行: