pyspark基于列聚合列

pkln4tw6  于 11个月前  发布在  Spark
关注(0)|答案(1)|浏览(131)

我有一个类似于

下面的框架
我想要

以下的
根据var_col_name中提到的值,需要检查列值,如果它们相同,则应该聚合,而不是当它们不相同时。
下面的代码聚合而不考虑var_col_name中提到的next col的值。

from pyspark.sql import SparkSession
 from pyspark.sql import functions as F
 from pyspark.sql.window import Window
 # Create a SparkSession
 spark = SparkSession.builder.getOrCreate()
 # Create the sample data
 data = [
('a', 1, 'text1', 'col1', 'texts1', 'scj', 'dsiul'),
('a', 11, 'text1', 'col1', 'texts1', 'ftjjjjjjj', 'jhkl'),
('b', 2, 'bigger text', 'next_col', 'gfsajh', 'xcj', 'biggest text'),
('b', 21, 'bigger text', 'next_col', 'fghm', 'hjjkl', 'ghjljkk'),
('c', 3, 'soon', 'column', 'szjcj', 'sooner', 'sjdsk')
]
# Create the DataFrame
df = spark.createDataFrame(data, ['name', 'id', 'txt', 'var_col_name', 'col1', 'column', 'next_col'])
# Group by 'name' and the column specified in 'var_col_name', and collect the 'id' values into a set
grouped_df = df.groupby('name', 'var_col_name').agg(F.collect_set('id').alias('id_all'))
# Create a window partitioned by 'name' and the column specified in 'var_col_name'
window = Window.partitionBy('name', 'var_col_name')
# Join the original DataFrame with the grouped DataFrame and select the necessary columns
df_agg = df.join(grouped_df, ['name', 'var_col_name'], 'left').select(df['*'], 
F.collect_set('id').over(window).alias('id_all'))
df_agg.show(truncate=False)

字符串
但是代码正在为id的名称b生成聚合,即使值“biggest text”不等于“ghjljkk”(仅当文本相同时聚合)


huus2vyu

huus2vyu1#

创建一个中间列(在下面的代码中称为idx),其中包含var_col_name的值和每行引用列的值的组合:

+----+---+-----------+------------+------+---------+------------+---------------------+
|name|id |txt        |var_col_name|col1  |column   |next_col    |idx                  |
+----+---+-----------+------------+------+---------+------------+---------------------+
|a   |1  |text1      |col1        |texts1|scj      |dsiul       |col1_texts1          |
|a   |11 |text1      |col1        |texts1|ftjjjjjjj|jhkl        |col1_texts1          |
|b   |2  |bigger text|next_col    |gfsajh|xcj      |biggest text|next_col_biggest text|
|b   |21 |bigger text|next_col    |fghm  |hjjkl    |ghjljkk     |next_col_ghjljkk     |
|c   |3  |soon       |column      |szjcj |sooner   |sjdsk       |column_sooner        |
+----+---+-----------+------------+------+---------+------------+---------------------+

字符串
中间列现在可以用作无界窗口的分区列:

w=Window.partitionBy('idx').orderBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn('all_cols', F.array([F.struct(F.lit(f'{c}'),
                                   F.col(f'{c}').cast('string')) for c in df.columns])) \
    .withColumn('idx', F.filter('all_cols', 
                                   lambda x: x['col1'] == F.col('var_col_name'))) \
    .withColumn('idx', F.concat_ws('_', F.col('idx')['col1'], 
                                   F.col('idx')['col2'])) \
    .withColumn('id_all', F.collect_list('id').over(w)) \
    .drop('all_cols', 'idx') \
    .show(truncate=False)


输出量:

+----+---+-----------+------------+------+---------+------------+-------+
|name|id |txt        |var_col_name|col1  |column   |next_col    |id_all |
+----+---+-----------+------------+------+---------+------------+-------+
|a   |1  |text1      |col1        |texts1|scj      |dsiul       |[1, 11]|
|a   |11 |text1      |col1        |texts1|ftjjjjjjj|jhkl        |[1, 11]|
|c   |3  |soon       |column      |szjcj |sooner   |sjdsk       |[3]    |
|b   |2  |bigger text|next_col    |gfsajh|xcj      |biggest text|[2]    |
|b   |21 |bigger text|next_col    |fghm  |hjjkl    |ghjljkk     |[21]   |
+----+---+-----------+------------+------+---------+------------+-------+

相关问题