val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)
2条答案
按热度按时间x3naxklr1#
正如thilo提到的,spark不需要加载内存中的所有内容就可以处理它。这是因为spark会将数据划分成更小的块,并分别对这些块进行操作。分区的数量和大小取决于以下几点:
文件的存储位置。spark最常用的选项已经将文件存储在一堆块中,而不是作为一个大数据块。例如,如果存储在hdfs中,那么默认情况下,这些块是64mb,并且这些块分布(和复制)在您的节点上。使用存储在s3中的文件,您将获得32mb的块。这是由hadoop定义的
FileSystem
它用于读取文件,并应用于spark使用hadoop读取的其他文件系统中的文件。你所做的任何重新划分。你可以打电话
repartition(N)
或者coalesce(N)
在rdd或Dataframe上更改分区的数量,从而更改分区的大小。coalesce
最好是减少数量而不在节点间乱序数据,而repartition
允许您指定拆分数据的新方法,即更好地控制在同一节点上处理数据的哪些部分。你可以做一些更高级的转换。例如,配置设置
spark.sql.shuffle.partitions
,默认设置为200,确定一个分区的数量DataFrame
由连接和聚合产生的缓存
前面的一段只涉及spark中数据的标准处理,但是我觉得你可能会有错误的想法,因为spark被宣传为“内存中”,所以我想说一点。默认情况下,spark中没有什么比任何其他数据处理工具更“内存”的了:下面是一个简单的示例:
sc.textFile(foo).map(mapFunc).saveTextFile(bar)
读取文件(逐块并分布在节点上),在内存中进行Map(就像任何计算机程序一样),然后再次将其保存到存储器中。spark对内存的使用在以下方面变得更有趣(在scala中,我对它更熟悉,但在python中,概念和方法名称完全相同):这里的想法是
cache()
所以预处理不需要做两次(可能);默认情况下,spark不保存任何中间结果,而是为每个单独的操作计算完整的链,这里的“操作”是saveTextFile
电话。我说“可能”,是因为实际缓存数据的能力受到节点内存的限制。spark为缓存存储保留了一定数量的内存,与工作内存分离(请参阅http://spark.apache.org/docs/latest/configuration.html#memory-管理如何管理这些内存部分的大小),并且只能缓存该容量所能容纳的最大容量。
但根据分区的不同,它可能会更少。假设您的3个节点上都有2gb的存储内存,其中的数据
preprocessed
是6gb。如果这个数据有3个分区,它将非常适合所有的输入数据mapFunc2
将从内存中加载。但是如果你有4个分区,每个1.5gb,每个节点只能缓存1个分区;第四个分区不适合每台机器上仍保留的0.5gb,因此必须重新计算该分区以进行第二次Map,并且只有3/4的预处理数据将从内存中读取。因此,从这个意义上说,最好有许多小分区,使缓存尽可能高效,但这可能有其他缺点:更大的开销,如果您碰巧使用细粒度模式的mesos,会有巨大的延迟,以及大量的小输出文件(如果您在保存前没有合并),因为spark会将每个分区保存为单独的文件。
正如durga提到的,还有一种可能是内存中不适合的数据溢出到磁盘,您可以按照他的链接:)
lb3vh1jj2#
默认情况下,存储级别为memory\ only,它将尝试将数据放入内存中。如果无法将数据放入内存,则会出现内存不足的问题。
它支持其他存储级别,如内存和磁盘、仅磁盘等。您可以通过spark文档了解不同的存储级别。您可以在rdd上调用persist函数来使用不同的存储级别。