pyspark:创建rdd->df->parquet的模式有1000个字段,但行的列数可变

h5qlskok  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(377)

我试图阅读一个elasticsearch索引,它有数百万个文档,每个文档都有不同数量的字段。我有一个模式,它有1000个字段,每个字段都有自己的名称和类型。
现在,当我通过es hadoop连接器创建rdd并稍后通过指定模式转换为Dataframe时,它没有说-
输入行没有架构所需的预期值数
我有几个问题。1有可能有一个rdd/df,其中的行包含可变数量的字段吗?如果没有,除了为每列中缺少的字段添加空值之外,还有什么其他选择?
我看到,默认情况下,spark会将所有内容转换为 StringType 就像我用的那样 sc.newAPIHadoopRDD() 打电话。如何根据模式中的字段名将它们类型转换为正确的类型?什么Map?
我想写这个Parquet格式的模式添加到文件中。与包含1000个字段的模式相比,丢失的字段会发生什么变化。

ndh0cuux

ndh0cuux1#

列数不能可变,但可以使用集合类型(如数组或Map)中的一列,在python中,这与字典相对应。这允许您在列中存储可变长度的数据。否则,需要为模式中的每一列都有一个值。通常用空值填充缺少的值。
如果你已经有了一个Dataframe,还有一个函数 get_column_type 如果从列名中获取类型名,则可以如下所示重铸整个Dataframe:

import pyspark.sql.functions as F
select_expressions = [ F.col(column_name).cast(get_column_type(column_name)) for column_name in column_list]
recasted_df = df.select(*select_expressions)

parquet文件将包含Dataframe中的任何列。如果需要文件中的1000个字段,它们必须在Dataframe中,因此必须用null或其他值填充缺少的值。
现在,如果你把所有这些点放在一起,你可能想这样做:
把每个弹性文件读成一行,用 id 现场和 doc maptype类型的字段。 explode doc字段,因此现在有3列: id , key 以及 value ,每个文档中的每个键对应一行。此时,您可以写入parquet文件并完成该过程。
如果希望Dataframe具有完整架构,则必须执行以下额外步骤:
透视结果,为每个id只生成一行,并为文档中的每个键生成一列及其对应的值: pivoted_df = df.groupBy('id').pivot('key').agg(F.first('value') 此Dataframe包含数据中存在的所有字段。如果知道完整架构,可以为缺少的列添加虚拟列: df = df.withColumn('new_column', lit(None).cast(StringType()) 最后用第2点中的代码重铸列,并删除列 id . 您可以将其写入parquet,它将包含大模式中的所有列。

相关问题