我有一个类似于
下面的框架
我想要
以下的
根据var_col_name中提到的值,需要检查列值,如果它们相同,则应该聚合,而不是当它们不相同时。
下面的代码聚合而不考虑var_col_name中提到的next col的值。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create the sample data
data = [
('a', 1, 'text1', 'col1', 'texts1', 'scj', 'dsiul'),
('a', 11, 'text1', 'col1', 'texts1', 'ftjjjjjjj', 'jhkl'),
('b', 2, 'bigger text', 'next_col', 'gfsajh', 'xcj', 'biggest text'),
('b', 21, 'bigger text', 'next_col', 'fghm', 'hjjkl', 'ghjljkk'),
('c', 3, 'soon', 'column', 'szjcj', 'sooner', 'sjdsk')
]
# Create the DataFrame
df = spark.createDataFrame(data, ['name', 'id', 'txt', 'var_col_name', 'col1', 'column', 'next_col'])
# Group by 'name' and the column specified in 'var_col_name', and collect the 'id' values into a set
grouped_df = df.groupby('name', 'var_col_name').agg(F.collect_set('id').alias('id_all'))
# Create a window partitioned by 'name' and the column specified in 'var_col_name'
window = Window.partitionBy('name', 'var_col_name')
# Join the original DataFrame with the grouped DataFrame and select the necessary columns
df_agg = df.join(grouped_df, ['name', 'var_col_name'], 'left').select(df['*'],
F.collect_set('id').over(window).alias('id_all'))
df_agg.show(truncate=False)
字符串
但是代码正在为id的名称b生成聚合,即使值“biggest text”不等于“ghjljkk”(仅当文本相同时聚合)
的
1条答案
按热度按时间huus2vyu1#
创建一个中间列(在下面的代码中称为
idx
),其中包含var_col_name
的值和每行引用列的值的组合:字符串
中间列现在可以用作无界窗口的分区列:
型
输出量:
型