# Create RDD from list of files
pathRdd = sc.parallelize([file1,file2,file3,.......,file100])
# Create a function which reads the data of file
def s3_path_to_data(path):
# Get data from s3
# return the data in whichever format you like i.e. String, array of String etc.
# Call flatMap on the pathRdd
dataRdd = pathRdd.flatMap(s3_path_to_data)
2条答案
按热度按时间wtlkbnrh1#
你可以通过设置下面的配置来尝试
其中x =群集中的节点总数 (节点中的核心总数-1) 5
wnavrhmk2#
这是spark(和其他大数据工具)的常见问题,因为它只使用驱动程序来列出源(S3)中的所有文件及其路径。
我发现这个X11E11F1X对解决这个问题非常有帮助,我们可以使用PureTools创建文件的并行RDD,并将其传递给Spark进行处理,而不是使用Spark列出和获取文件的元数据。
如果您不想像上面的指南那样安装和设置工具,您也可以使用S3清单文件列出一个桶中存在的所有文件,并使用rdds并行迭代这些文件。
Spark将创建一个默认分区数的pathRdd。然后在每个分区的行上并行调用s3_path_to_data函数。分区在Spark并行性中扮演着重要的角色。例如,如果你有4个执行器和2个分区,那么只有2个执行器会做这项工作。你可以根据你的用例来调整分区数和执行器数,以达到最佳性能。
下面是一些有用的属性,你可以用它来了解你的df或rdd规格,以微调Spark参数。