spark没有与binaryfile并行运行rdd

5uzkadbs  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(490)

我是spark的新手,并开始用python编写一些脚本。我的理解是spark并行执行转换(map)。

def some_function(name, content):
    print(name, datetime.now())
    time.sleep(30)
    return content

config = SparkConf().setAppName("sample2").setMaster("local[*]")
filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")
inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))
print(inputfileRDD.collect())

上面的代码收集 .zip 文件夹中的文件并对其进行处理。当我执行它时,我看到这是按顺序发生的。
输出

file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085
file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317

您可以看到它在30秒后开始处理第二个文件。意思是在完成第一个文件之后。我的代码出了什么问题?为什么不并行执行rdd?你能帮帮我吗?

oalqel3c

oalqel3c1#

我不知道这个方法 binaryFiles 将文件按spark分区进行分区。似乎恰恰相反 textFiles ,它倾向于只创建一个分区。让我们通过一个名为 dir 包含5个文件。

> ls dir
test1  test2  test3  test4  test5

如果我使用 textFile ,事情是并行的。我不提供输出,因为它不是很漂亮,但你可以检查自己。我们可以证实事情是并行运行的 getNumPartitions .

>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))

# ugly output, but everything starts at the same time,

# except maybe the last one since you have 4 cores.

>>> sc.textFile("dir").getNumPartitions()
5

binaryFiles 事情是不同的,出于某种原因,所有的东西都被放在同一个分区。

>>> sc.binaryFiles("dir").getNumPartitions()
1

我甚至试过处理10k文件,但所有的文件仍然放在同一个分区中。我相信这背后的原因是在斯卡拉, binaryFiles 返回带有文件名的rdd和允许读取文件(但不执行读取)的对象。因此它是快速的,并且产生的rdd很小。因此,将它放在一个分区上是可以的。因此,在scala中,我们可以在使用 binaryFiles 一切都会很顺利的。

scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
    .foreach{ case (name, ds) => { 
        println(System.currentTimeMillis+": "+name)
        Thread.sleep(2000)
        // do some reading on the DataStream ds
    }}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2

python的问题是 binaryFiles 实际上是将文件读入一个分区。另外,这对我来说是非常神秘的,但是pyspark2.4中的以下代码行产生了您注意到的相同行为,这是没有意义的。


# this should work but does not

sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))

# this does not work either, which is strange but it would not be advised anyway

# since all the data would be read on one partition

sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))

然而,自从 binaryFiles 实际读取文件时,可以使用 wholeTextFile 它将文件读取为文本文件并按预期运行:


# this works

sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))

相关问题