我试图阅读一个elasticsearch索引,它有数百万个文档,每个文档都有不同数量的字段。我有一个模式,它有1000个字段,每个字段都有自己的名称和类型。
现在,当我通过es hadoop连接器创建rdd并稍后通过指定模式转换为Dataframe时,它没有说-
输入行没有架构所需的预期值数
我有几个问题。1有可能有一个rdd/df,其中的行包含可变数量的字段吗?如果没有,除了为每列中缺少的字段添加空值之外,还有什么其他选择?
我看到,默认情况下,spark会将所有内容转换为 StringType
就像我用的那样 sc.newAPIHadoopRDD()
打电话。如何根据模式中的字段名将它们类型转换为正确的类型?什么Map?
我想写这个Parquet格式的模式添加到文件中。与包含1000个字段的模式相比,丢失的字段会发生什么变化。
1条答案
按热度按时间ndh0cuux1#
列数不能可变,但可以使用集合类型(如数组或Map)中的一列,在python中,这与字典相对应。这允许您在列中存储可变长度的数据。否则,需要为模式中的每一列都有一个值。通常用空值填充缺少的值。
如果你已经有了一个Dataframe,还有一个函数
get_column_type
如果从列名中获取类型名,则可以如下所示重铸整个Dataframe:parquet文件将包含Dataframe中的任何列。如果需要文件中的1000个字段,它们必须在Dataframe中,因此必须用null或其他值填充缺少的值。
现在,如果你把所有这些点放在一起,你可能想这样做:
把每个弹性文件读成一行,用
id
现场和doc
maptype类型的字段。explode
doc字段,因此现在有3列:id
,key
以及value
,每个文档中的每个键对应一行。此时,您可以写入parquet文件并完成该过程。如果希望Dataframe具有完整架构,则必须执行以下额外步骤:
透视结果,为每个id只生成一行,并为文档中的每个键生成一列及其对应的值:
pivoted_df = df.groupBy('id').pivot('key').agg(F.first('value')
此Dataframe包含数据中存在的所有字段。如果知道完整架构,可以为缺少的列添加虚拟列:df = df.withColumn('new_column', lit(None).cast(StringType())
最后用第2点中的代码重铸列,并删除列id
. 您可以将其写入parquet,它将包含大模式中的所有列。