pyspark 在Spark中,“重新分区”与并行性有什么关系?在什么情况下它会加速操作的执行?

thtygnil  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(127)

我开始学习Spark,我在这里读了大量的答案,但我仍然不明白。
问题如下:我从表中读取数据并将其加载到日期框中

spark.read.format("jdbc").....load()

那我就得写信给帕奎特

jdbcDF.write.mode("overwrite").format("parquet").option("compression", "snappy").save("test.parquet")

如果我的表包含2000万行和700列,或者如果它很小,并行化方法会改变吗?我怎么知道我是否需要并行性,或者Spark是否自己在引擎盖下完成了它?在parquet中写入数据时如何实现并行性?
请告诉我在哪里可以找到资料
我用这个来代替Pandas,我需要它快速工作。我不明白如何把这些文字平行排列。我以为“重新分区”是这样做的,但在阅读了文章后,我意识到事实并非如此。

xmakbtuz

xmakbtuz1#

欢迎来到Stackoverflow!
如果您真的只想通过JDBC读取一个表,然后立即将其写入parquet,那么您不应该使用.repartition()。这是一个昂贵的操作,因为它需要在Spark执行器中来回移动数据。
看起来您只想以并行方式读取表,然后将其作为分区的parquet文件写入。如果是这种情况,每个Spark执行器可以简单地查询原始表的一部分,然后将该部分作为parquet分区写入。
你可以通过如下方式实现这一点:

numPartitions = X
jdbcDF = spark.read
  .format("jdbc")
  .option(...) #a bunch of options like url/user/password
  .option("numPartitions", numPartitions)
  .load()

如你所见,我们指定了一个名为numPartitions的变量,并在read命令的"numPartitions"选项中使用它。这将把你的表分成X个分区,并并行处理它们。
一些注意事项:

  • 增大此数值将使此操作并行化,直到达到numPartitions,即大于集群中的核心数。如果您有2个执行程序,每个执行程序有5个核心,则最多可以同时处理10个分区。如果您指定的numPartitions更高,比如20,那么您的执行程序将首先处理10个分区,并在完成第一个分区时处理其他分区。
  • 如果您有一个非常大的集群(许多CPU)可用,请小心指定过大的numPartitions:这将并行启动一堆请求,并可能使您通过JDBC查询的数据库过载。
  • 如果你想限制并行请求的数量,但有一个非常大的表,所以你仍然想要一个高的numPartitions值,只需将Spark集群中的核心数量限制在你的数据库可以处理的数量。

像这样读入数据后,您可以简单地将其写入指定的parquet文件:

jdbcDF.write.mode("overwrite").format("parquet").option("compression", "snappy").save("test.parquet")

相关问题