如果我显式地传递模式,是否需要在spark with parquet中使用“mergeschema”选项?

wqlqzqxt  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(250)

来自spark文档:
由于模式合并是一个相对昂贵的操作,而且在大多数情况下不是必需的,因此我们从1.5.0开始默认关闭了它。您可以在读取Parquet文件时将数据源选项mergeschema设置为true(如下面的示例所示),或者将全局sql选项spark.sql.parquet.mergeschema设置为true来启用它。
(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)
我从文档中了解到,如果我有多个具有不同模式的Parquet分区,spark将能够自动合并这些模式 spark.read.option("mergeSchema", "true").parquet(path) .
如果我在查询时不知道这些分区中存在什么模式,那么这似乎是一个不错的选择。
但是,考虑这样一种情况,我有两个分区,一个使用旧模式,另一个使用新模式,新模式的不同之处在于只有一个附加字段。我们还假设我的代码知道新的模式,并且我能够显式地传递这个模式。
在这种情况下,我会这样做 spark.read.schema(my_new_schema).parquet(path) . 在这种情况下,我希望spark能够使用新模式在两个分区中读取数据,并为旧分区中的任何行提供新列的空值。这是预期的行为吗?还是我也需要用 option("mergeSchema", "true") 在这种情况下也是?
我希望尽可能避免使用mergeschema选项,以避免文档中提到的额外开销。

fsi0uk1n

fsi0uk1n1#

我尝试过扩展上面链接的spark文档中的示例代码,我的假设似乎是正确的。见下表:

// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]

scala> squaresDF.write.parquet("test_data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]

scala> cubesDF.write.parquet("test_data/test_table/key=2")

// Read the partitioned table

scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

scala> mergedDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]

// Note that cube column is missing.
scala> naiveDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- key: integer (nullable = true)

// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
|    3|     9|null|  1|
|    4|    16|null|  1|
|    5|    25|null|  1|
|    8|  null| 512|  2|
|    9|  null| 729|  2|
|   10|  null|1000|  2|
|    1|     1|null|  1|
|    2|     4|null|  1|
|    6|  null| 216|  2|
|    7|  null| 343|  2|
+-----+------+----+---+

如您所见,在使用显式模式读取数据时,spark似乎正确地为Parquet分区中缺少的任何列提供了null值。
这让我感到相当自信,我可以用“不,在这种情况下没有必要使用mergeschema选项”来回答我的问题,但我仍然想知道是否有什么需要注意的注意事项。如有任何其他人的帮助,我们将不胜感激。

相关问题