Apache Spark 启动SQL JDBC numberOfPartitions计算(从大数据负载到小数据负载)

cnjp1d6j  于 2022-12-04  发布在  Apache
关注(0)|答案(2)|浏览(123)

我有一个用例,根据传入的参数,我可能必须从数据库中获取和处理1)数百万条记录(使用JDBC读取RDBMS、解码、转换成XML、转换成CSV等,非常耗时的过程),和/或2)只处理几百条甚至几条记录。请注意,我不知道这个多记录中的数据量。tenant spark应用程序,直到我的应用程序运行时,我计算需要处理的记录总数。因此,我有两个问题:
1.在启动运行时不知道数据量的情况下,如何知道需要为该spark作业请求多少个执行器或内核?
1.因为我在DB表上进行jdbc调用,所以我使用numOfPartitions的下界(0),上限(记录总数),和分区列(ROW_NUM)对SparkSQL进行分区。现在如何计算numOfPartition?在一个例子中,当我获取数百万个分区时,我希望有更多的分区,而对于少数分区,我希望有更少的分区。如何确定这个数字?逻辑是什么?对于100个分区,numOfPartition是否为10-20?200?我们不希望占用数据库资源而影响事务应用程序。人们通常如何决定分区数?感谢您的帮助,谢谢
需要帮助确定数据库numOfPartitions

fkaflof6

fkaflof61#

It can be challenging to determine the optimal number of executors and cores for a Spark job without knowing the volume of data that needs to be processed. In general, you will want to use as many executors and cores as possible to maximize the parallelism of the job and reduce the overall processing time.
However, it's important to consider the following factors when determining the number of executors and cores to use:
The size and complexity of the data: If the data is large and complex, you may need more executors and cores to process it effectively. The available resources: The number of executors and cores you can use will depend on the resources available on the cluster. If the cluster is already heavily utilized, you may need to use fewer executors and cores to avoid overloading the system. The overall performance of the job: You can use Spark's built-in performance metrics to monitor the performance of the job and adjust the number of executors and cores as needed to optimize the processing time. One approach you could take is to start with a small number of executors and cores and gradually increase them as needed based on the performance of the job and the available resources. You can also use Spark's dynamic allocation feature to automatically adjust the number of executors and cores based on the workload and available resources. This can help ensure that your Spark job is able to effectively process the data without overloading the system.
Spark's dynamic allocation feature allows the Spark application to automatically request additional executors or release unused executors based on the workload and available resources in the cluster. This can help improve the overall performance and efficiency of the Spark application by ensuring that the right amount of resources are available to process the data.
Dynamic allocation is enabled by default in Spark, but it can be configured using the spark.dynamicAllocation.enabled property in the Spark configuration.
You can also adjust the default behavior of dynamic allocation using the following properties:
spark.dynamicAllocation.minExecutors: The minimum number of executors to use for the application. spark.dynamicAllocation.maxExecutors: The maximum number of executors to use for the application. spark.dynamicAllocation.initialExecutors: The initial number of executors to use for the application.
By default, dynamic allocation will try to maintain a constant number of executors based on the workload and available resources. However, you can also configure it to scale up or down based on the workload using the spark.dynamicAllocation.scalingUpFactor and spark.dynamicAllocation.scalingDownFactor properties.
Overall, using Spark's dynamic allocation feature can help improve the performance and efficiency of your Spark application by automatically allocating the right amount of resources for the data being processed.

7z5jn7bk

7z5jn7bk2#

The number of partitions to use when reading data from a database using the JDBC connector can have a significant impact on the performance and efficiency of the Spark job. In general, a larger number of partitions will allow the data to be processed in parallel across multiple nodes in the cluster, which can improve the overall processing time. However, using too many partitions can also cause performance issues, such as overwhelming the database with too many concurrent connections.
When you use the numPartitions parameter in a JDBC query in Spark, it will create one database connection for each partition, which can potentially overwhelm the source database if the number of partitions is too large. To avoid this issue, it's important to carefully consider the number of partitions you use in your query.
One approach you could take is to use a smaller number of partitions, such as 10-20 partitions, and ensure that each partition processes a reasonable amount of data. For example, you could use the partitionColumn and lowerBound and upperBound parameters to specify a range of values for the partition column, and then set the numPartitions parameter to a value that will create partitions of approximately 128 MB in size. This can help ensure that the number of database connections used by the query is manageable and will not overwhelm the source database.
After the query, you can repartition the DataFrame with "repartition" such as:

val repartitionedDF = df.repartition(idealNumPartitions)

To calculate the optimal number of partitions for repartition we need to evaluate the size of the DataFrame, it can be done with:

val sizeInBytes =
      usersDFDistinct.queryExecution.optimizedPlan.stats.sizeInBytes

Then we can calculate the optimal number of partitions with:

val sizeInMB: Double = (sizeInBytes).toDouble / 1024.0 / 1024.0
    println(f"🚀 Size in MB of the result: $sizeInMB")

 val idealNumPartitions = Math.max(1, Math.ceil(sizeInMB / 128).toInt)
    println(f"🚀 Ideal number of partitions: $idealNumPartitions")

相关问题