我知道有一种方法可以根据集群资源(“执行器内存”和“执行器数量”和“执行器内核”)配置Spark应用程序,我想知道是否有一种方法可以考虑数据输入大小?
如果数据输入大小不适合所有分区,会发生什么情况?
示例:
- 数据输入大小= 200GB
- 集群中的分区数= 100
- 分区大小= 128MB
- 分区可以处理的总大小= 100 * 128MB = 128GB
其余的数据(72GB)呢?
我猜Spark会等待释放资源,因为它是为了处理批量数据而设计的。这是一个正确的假设吗?
提前感谢
我知道有一种方法可以根据集群资源(“执行器内存”和“执行器数量”和“执行器内核”)配置Spark应用程序,我想知道是否有一种方法可以考虑数据输入大小?
如果数据输入大小不适合所有分区,会发生什么情况?
示例:
其余的数据(72GB)呢?
我猜Spark会等待释放资源,因为它是为了处理批量数据而设计的。这是一个正确的假设吗?
提前感谢
2条答案
按热度按时间nvbavucw1#
为了获得最佳性能,我建议不要设置
spark.executor.cores
。您希望每个工作进程有一个执行器。另外,在spark.executor.memory
中使用约70%的执行器内存。最后-如果您希望实时应用统计信息影响分区数量,请使用Spark 3,因为它将附带Adaptive Query Execution(AQE)。使用AQE,Spark将动态合并shuffle分区。因此,您可以将其设置为任意大的分区数,例如:spark.sql.shuffle.partitions=<number of cores * 50>
那就让AQE去做吧。你可以在这里阅读更多关于它的信息:https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
dhxwm5r42#
您的问题有两个方面。第一个是关于此数据的存储,第二个是关于数据执行。
关于存储,当你说
Size of partitions = 128MB
时,我假设你使用HDFS来存储这些数据,128 M是你的默认块大小。HDFS本身内部决定如何分割这个200 GB的文件,并将其存储在不超过128 M的块中。你的HDFS集群应该有超过200GB * replication factor
的组合存储来持久存储这些数据。接下来是问题的Spark执行部分,一旦定义了
spark.default.parallelism=100
,这意味着Spark在执行某些操作时将使用该值作为默认的并行级别请注意,每个执行器处理的数据量不受块大小的影响这意味着每个执行器任务将处理200 G/100 = 2G的数据(前提是执行器内存足够执行所需操作)。如果spark集群中没有足够的容量来并行运行100个执行器,那么它将在资源可用时分批启动尽可能多的执行器。