使用‘struct_name.*’进行选择时,为所有列添加前缀

whlutmcx  于 2022-10-07  发布在  Spark
关注(0)|答案(5)|浏览(170)

下面的 Dataframe 是一个名为‘TABLE_NAME’的临时表。
您将如何使用spak.sql()为所有列添加前缀?

root
 |-- MAIN_COL: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: string (nullable = true)
 |    |-- d: string (nullable = true)
 |    |-- f: long (nullable = true)
 |    |-- g: long (nullable = true)
 |    |-- h: long (nullable = true)
 |    |-- j: long (nullable = true)

下面的查询

spark.sql("select MAIN_COL.* from table_name")

返回名为a、b、c的列,但如何使它们看起来都像prea、preb、prec?

希望避免逐一选择和给他们指定别名。如果我有30列呢?

我希望一个自定义的UDF可以解决这个问题,在SQL中使用,但真的不确定如何处理这个问题。


# Generate a pandas DataFrame

import pandas as pd
a_dict={
    'a':[1,2,3,4,5],
    'b':[1,2,3,4,5],
    'c':[1,2,3,4,5],
    'e':list('abcde'),
    'f':list('abcde'),
    'g':list('abcde')
}
pandas_df=pd.DataFrame(a_dict)

# Create a Spark DataFrame from a pandas DataFrame using Arrow

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(pandas_df)

# struct

from pyspark.sql.functions import struct
main=df.select(struct(df.columns).alias("MAIN_COL"))
dwbf0jvd

dwbf0jvd1#

这里有一种方法可以遍历这些字段并动态修改它们的名称。首先使用main.schema.fields[0].dataType.fields访问目标字段。接下来,使用pythonmappre_添加到每个字段:

from pyspark.sql.types import *
from pyspark.sql.functions import col

inner_fields = main.schema.fields[0].dataType.fields

# [StructField(a,LongType,true),

# StructField(b,LongType,true),

# StructField(c,LongType,true),

# StructField(e,StringType,true),

# StructField(f,StringType,true),

# StructField(g,StringType,true)]

pre_cols = list(map(lambda sf: StructField(f"pre_{sf.name}", sf.dataType, sf.nullable), inner_fields))

new_schema = StructType(pre_cols)

main.select(col("MAIN_COL").cast(new_schema)).printSchema()

# root

# |-- MAIN_COL: struct (nullable = false)

# |    |-- pre_a: long (nullable = true)

# |    |-- pre_b: long (nullable = true)

# |    |-- pre_c: long (nullable = true)

# |    |-- pre_e: string (nullable = true)

# |    |-- pre_f: string (nullable = true)

# |    |-- pre_g: string (nullable = true)

最后,您可以将cast与@Mahesh已经提到的新模式一起使用。

x6yk4ghg

x6yk4ghg2#

Spark的美丽,您可以通过编程操作元数据

这是一个继续原始代码片段的示例:

main.createOrReplaceTempView("table_name")

new_cols_select = ", ".join(["MAIN_COL." + col + " as pre_" + col for col in spark.sql("select MAIN_COL.* from table_name").columns])

new_df = spark.sql(f"select {new_cols_select} from table_name")

由于Spark的懒惰,并且因为所有操作都只是元数据,所以该代码几乎没有任何性能成本,并且对于10列或500列也同样有效(我们实际上正在对1k的列执行类似的操作)。

还可以使用df.schema对象以更优雅方式获取原始列名

fwzugrvs

fwzugrvs3#

您可以尝试这样做:根据需要将所有列添加到方案2

val schema2 = new StructType()
    .add("pre_a",StringType)
    .add("pre_b",StringType)
    .add("pre_c",StringType)

现在使用LIKE选择列:

df.select(col("MAIN_COL").cast(schema2)).show()

它将为您提供所有更新的列名。

jjhzyzn0

jjhzyzn04#

下面的展开所有结构列添加父列名称作为前缀。

struct_cols = [c for c, t in df.dtypes if t.startswith('struct')]
for c in struct_cols:
    schema = T.StructType([T.StructField(f"{c}_{f.name}", f.dataType, f.nullable) for f in df.schema[c].dataType.fields])
    df = df.withColumn(c, F.col(c).cast(schema))
df = df.select([f"{c}.*" if c in struct_cols else c for c in df.columns])

测试输入:

from pyspark.sql import functions as F
from pyspark.sql import types as T

df = spark.createDataFrame([((1, 2), 5)], 'c1:struct<f1:int,f2:int>, c2:int')
print(df.dtypes)

# [('c1', 'struct<f1:int,f2:int>'), ('c2', 'int')]

结果:

struct_cols = [c for c, t in df.dtypes if t.startswith('struct')]
for c in struct_cols:
    schema = T.StructType([T.StructField(f"{c}_{f.name}", f.dataType, f.nullable) for f in df.schema[c].dataType.fields])
    df = df.withColumn(c, F.col(c).cast(schema))
df = df.select([f"{c}.*" if c in struct_cols else c for c in df.columns])

print(df.dtypes)

# [('c1_f1', 'int'), ('c1_f2', 'int'), ('c2', 'int')]
zpgglvta

zpgglvta5#

您也可以使用PySpark执行此操作:

df.select([col(col_name).alias('prefix' + col_name) for col_name in df])

相关问题