如何在parquet分区中使用不同的模式

k2arahey  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(411)

我将json文件读入Dataframe。json可以有一个特定于名称的struct字段消息,如下所示。

Json1
{
   "ts":"2020-05-17T00:00:03Z",
   "name":"foo",
   "messages":[
      {
         "a":1810,
         "b":"hello",
         "c":390
      }
   ]
}

Json2
{
   "ts":"2020-05-17T00:00:03Z",
   "name":"bar",
   "messages":[
      {
         "b":"my",
         "d":"world"
      }
   ]
}

当我将json中的数据读入一个Dataframe时,我得到如下的模式。

root
 |-- ts: string (nullable = true)
 |-- name: string (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)

这很好。现在,当我保存到按名称分区的parquet文件时,如何在foo和bar分区中有不同的模式?

path/name=foo
root
 |-- ts: string (nullable = true)
 |-- name: string (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

path/name=bar
root
 |-- ts: string (nullable = true)
 |-- name: string (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- d: string (nullable = true)

当我从根路径读取数据时,如果我得到所有foo和bar字段的schema,我就没事了。但是当我从path/name=foo读取数据时,我只期望foo模式。

5sxhfpxr

5sxhfpxr1#

1. Partitioning & Storing as Parquet file: 如果保存为Parquet格式,则在阅读时
path/name=foo specify the schema 包括所有必需的字段(a、b、c),然后spark只加载这些字段。
如果我们 won't 指定schema,那么所有字段(a、b、c、d)都将包含在Dataframe中 EX:schema=define structtype...schema spark.read.schema(schema).parquet(path/name=foo).printSchema()2.Partitioning & Storing as JSON/CSV file: 那么spark就不会在 path/name=foo 文件,所以当我们只读取name=foo目录时,我们不会得到 b,d 数据中包含的列。 EX: ```
spark.read.json(path/name=foo).printSchema()
spark.read.csv(path/name=foo).printSchema()

ffx8fchx

ffx8fchx2#

您可以在将Dataframe保存到分区之前更改模式,为此您必须过滤分区记录,然后将它们保存到相应的文件夹中


# this will select only not null columns which will drop col d from foo and a,c from bar

df = df.filter(f.col('name')='foo').select(*[c for c in df.columns if df.filter(f.col(c).isNotNull()).count() > 0])

# then save the df

df.write.json('path/name=foo')

现在每个分区将有不同的模式。

相关问题