为什么重命名schema alter查询pyspark的结果?

zsbz8rwp  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(96)

我有一个大型 parquet 框架,字段名称中包含无效字符(点和括号),如果我按原样读取并打印一些聚合,

>>> df1 = spark.read.parquet(source)
>>> df1.select('`A.B.C`').fillna(0).agg({'`A.B.C`' : 'max'}).show()
+----------+
|max(A.B.C)|
+----------+
|  5989.625|
+----------+

为了方便进一步的工作,我想重命名这些列。如果我用schema

schema = df1.schema
new_fields = [
    StructField(f.name.replace('.', '___').replace('(', '-').replace(')', '-'), f.dataType, f.nullable) for f in schema.fields
]
new_schema = StructType(new_fields)

df2 = spark.read.schema(new_schema).parquet(source)

然后聚合得到另一个结果

>>> df2.select('A__B__C').fillna(0).agg({'A__B__C' : 'max'}).show()
+------------+
|max(A__B__C)|
+------------+
|         0.0|
+------------+

如果我通过Map重命名

mapping = {c : c.replace('.', '___').replace('(', '-').replace(')', '-') for c in df1.columns}
df3 = df1.select([F.col(f'`{c}`').alias(mapping[c] for c in df1.columns])

它与聚合一起工作

>>> df3.select('A__B__C').fillna(0).agg({'A__B__C' : 'max'}).show()
+------------+
|max(A__B__C)|
+------------+
|    5989.625|
+------------+

然而,当我试图保存它进行测试时,

df3.sample(fraction=0.0001).write.mode('overwrite').saveAsTable('test_table')

它给出错误
Attribute name "Some.Col(1)" contains invalid character(s) among " ,;{}()\n\t="
尽管"Some.Col(1)" in df3.columns给出False"Some__Col-1-" in df3.columns给出True,直接索引也是如此。
同时,对每个列使用withColumnRenamed需要更长的时间,所以我没有测试它。
那么,为什么使用新的模式会改变聚合的输出呢?为什么重命名可以在聚合中工作,但在写入时失败,就像某些列没有重命名一样?如何正确重命名?

wljmcqd8

wljmcqd81#

用途:

df2 = df1.selectExpr('`A.B.C` A__B__C')

在选择时重命名列,并且:

from pyspark.sql import functions as sf
df2.agg(sf.max(df1.A__B__C).alias('max_A__B__C'))

重命名max(或另一个具有别名的selectExpr/select),以便没有“()”停止保存

相关问题