假设我有一个PySpark DataFrame,列为date、item_id、item_type和item_vol。下面是示例输入:
我尝试得到如下输出:
这里,每个item_id可以具有多个item_types和item_vol。我想将此DataFrame转换为宽格式,其中每行表示date和item_id的唯一组合,列item_type_X和item_vol_X分别表示该项目ID的第X个项目类型和项目数量。如果项目ID的项目类型或项目数量少于X个,则对应的列应填充为null。项目item_type_X和item_vol_X的限制为5。将变为item_type_5和item_vol_5
我使用了下面的代码。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import when
from pyspark.sql.functions import col, collect_list
# create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
# create the schema for the DataFrame
schema = StructType([
StructField("date", StringType(), True),
StructField("item_id", StringType(), True),
StructField("item_type", StringType(), True),
StructField("item_vol", IntegerType(), True)
])
# create the DataFrame
data = [
('2019-01-01', 'item3', 'aa', 1),
('2019-01-01', 'item3', 'bb', 2),
('2019-01-01', 'item67', 'cc', 4),
('2019-01-01', 'item67', 'dd', None),
('2019-01-01', 'item68', 'gas', 9),
]
new_df = spark.createDataFrame(data, schema).orderBy("date", "item_id", "item_type", "item_vol")
# show the DataFrame
new_df.show()
# group by date and item_id and collect lists of item_type and item_vol
ordered_df = new_df\
.groupBy("date", "item_id") \
.agg(collect_list("item_type").alias("item_type_list"),
collect_list(when(col("item_vol").isNull(), "null").otherwise(col("item_vol"))).alias("item_vol_list"))
ordered_df.show(5, False)
from pyspark.sql.functions import split, col
num_cols = 5
pivot_cols = []
for i in range(num_cols):
item_type_expr = col("item_type_list").getItem(i).alias(f"item_type_{i+1}")
item_vol_expr = col("item_vol_list").getItem(i).alias(f"item_vol_{i+1}")
pivot_cols.extend([item_type_expr, item_vol_expr])
# split the "item_type_list" and "item_vol_list" columns and store each item in different columns
split_df = ordered_df.select("date", "item_id",
*pivot_cols
)
# show the resulting DataFrame
split_df.show(10, False)
但是,使用collect_list并不保留行的顺序。我想把item_type保持升序。但是在groupBy中使用collect_list有时会打乱列的顺序。我的意思是像下面:
如果有人能帮忙的话,我将不胜感激。谢谢。
1条答案
按热度按时间4uqofj5v1#
代码
结果