如何在不进行数据扫描的情况下覆盖pyspark DataFrame模式?

guykilcj  于 2022-10-07  发布在  Spark
关注(0)|答案(2)|浏览(119)

此问题与https://stackoverflow.com/a/37090151/1661491相关。假设我有一个具有特定模式的pyspark DataFrame,我想用我知道兼容的新模式覆盖该模式,我可以这样做:

df: DataFrame
new_schema = ...

df.rdd.toDF(schema=new_schema)

不幸的是,这会触发上述链接中描述的计算。有没有一种方法可以在元数据级别(或懒惰)做到这一点,而不急于触发计算或转换?

编辑,请注意:

  • 模式可以任意复杂(嵌套等)
  • 新架构包括对描述、可为空性和附加元数据的更新(更新类型可获得加分)
  • 我想避免编写自定义查询表达式生成器,除非Spark中已经内置了一个可以基于架构/StructType生成查询的生成器
piv4azn7

piv4azn71#

最后,我自己也深入探讨了这个问题,我很好奇您对我的变通方法/POC的看法。参见https://github.com/ravwojdyla/spark-schema-utils。它转换表达式,并更新属性。

假设我有两个模式,第一个没有任何元数据,让我们调用schema_wo_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {},
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {},
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

第二个在内部(ia)字段和外部(ob)字段上具有额外的元数据,我们将其命名为schema_wi_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {
                "description": "this is ia desc"
              },
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {
        "description": "this is ob desc"
      },
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

现在假设我有一个具有schema_wo_metadata架构的数据集,并且希望将该架构与schema_wi_metadata交换:

from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType

# I assume these get generate/specified somewhere

schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...

# You need my extra package

spark = SparkSession.builder 
    .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") 
    .getOrCreate()

# Dummy data with `schema_wo_metadata` schema:

df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
                                 Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
                           schema=schema_wo_metadata)

_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)

现在,new_df具有schema_wi_metadata,例如:

new_df.schema["oa"].dataType.elementType["ia"].metadata

# -> {'description': 'this is ia desc'}

有什么意见吗?

hfsqlsce

hfsqlsce2#

仅供快速更新,此功能已通过https://github.com/apache/spark/pull/37011添加到Spark中,并将在版本3.4.0中发布。

相关问题