我有一个大型 parquet 框架,字段名称中包含无效字符(点和括号),如果我按原样读取并打印一些聚合,
>>> df1 = spark.read.parquet(source)
>>> df1.select('`A.B.C`').fillna(0).agg({'`A.B.C`' : 'max'}).show()
+----------+
|max(A.B.C)|
+----------+
| 5989.625|
+----------+
为了方便进一步的工作,我想重命名这些列。如果我用schema
schema = df1.schema
new_fields = [
StructField(f.name.replace('.', '___').replace('(', '-').replace(')', '-'), f.dataType, f.nullable) for f in schema.fields
]
new_schema = StructType(new_fields)
df2 = spark.read.schema(new_schema).parquet(source)
然后聚合得到另一个结果
>>> df2.select('A__B__C').fillna(0).agg({'A__B__C' : 'max'}).show()
+------------+
|max(A__B__C)|
+------------+
| 0.0|
+------------+
如果我通过Map重命名
mapping = {c : c.replace('.', '___').replace('(', '-').replace(')', '-') for c in df1.columns}
df3 = df1.select([F.col(f'`{c}`').alias(mapping[c] for c in df1.columns])
它与聚合一起工作
>>> df3.select('A__B__C').fillna(0).agg({'A__B__C' : 'max'}).show()
+------------+
|max(A__B__C)|
+------------+
| 5989.625|
+------------+
然而,当我试图保存它进行测试时,
df3.sample(fraction=0.0001).write.mode('overwrite').saveAsTable('test_table')
它给出错误Attribute name "Some.Col(1)" contains invalid character(s) among " ,;{}()\n\t="
尽管"Some.Col(1)" in df3.columns
给出False
,"Some__Col-1-" in df3.columns
给出True
,直接索引也是如此。
同时,对每个列使用withColumnRenamed
需要更长的时间,所以我没有测试它。
那么,为什么使用新的模式会改变聚合的输出呢?为什么重命名可以在聚合中工作,但在写入时失败,就像某些列没有重命名一样?如何正确重命名?
1条答案
按热度按时间wljmcqd81#
用途:
在选择时重命名列,并且:
重命名max(或另一个具有别名的selectExpr/select),以便没有“()”停止保存