如何在pyspark dataframe中实现这一点

dhxwm5r4  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(228)

我有一个pyspark Dataframe df:-
| 库存单位|商店|旗帜|日期|
| - -|- -|- -|- -|
| 小行星330001|小行星629138|一个|二○二一○五零七|
| 33000009746的电话|小行星629138|第0页|小行星2021|
| 小行星50000441489|小行星629138|一个|二○二一○五一一|
| 小行星220000|小行星187367|第0页|二〇二二〇二一零|
| 小行星414 - 1971|小行星187367|一个|小行星20210628|
| 50000577980号|小行星176129|第0页|二〇二二〇二二五|
| 50000001六零七|小行星633782|一个|二○二一○四一九|
| 50000001六零七|小行星633782|一个|二○二一○四一九|
| 50000001六零八|小行星633782|一个|二○二一○四一九|
我想要取得每个存放区之最大(最新)DateSKU存在(以Flag表示)的相异计数。范例:=
| 贮藏|独特SKU计数|
| - -|- -|
| 小行星629138|一个|
| 小行星187367|第0页|
| 小行星176129|第0页|
| 小行星633782| 2个|
我该怎么做?

xfb7svmp

xfb7svmp1#

使用窗口函数:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame(
    [
    ('33000120304','629138','1','20210507')
    ,('33000009746','629138','0','20210129')
    ,('50000441489','629138','1','20210511')
    ,('22000020680','187367','0','20220210')
    ,('41419714737','187367','1','20210628')
    ,('50000577980','176129','0','20220225')
    ,('50000001607','633782','1','20210419')
    ,('50000001607','633782','1','20210419')
    ,('50000001608','633782','1','20210419')
    ],
    ['SKU','Store','Flag','Date']
).withColumn('Date', F.to_date(F.col('Date'), 'yyyyMMdd'))

w = Window.partitionBy("Store").orderBy(F.desc('Date'))

df_res = df\
    .withColumn('max_date', F.max(F.col('Date')).over(w))\
    .filter((F.col('Date') == F.col('max_date')))\
    .filter(F.col('Flag')==1)\
    .groupBy('Store').agg(F.countDistinct("SKU").alias('count_distinct_sku'))

df\
.select('Store')\
.dropDuplicates()\
.join(df_res, on='Store', how='left')\
.fillna(0)\
.show()

# +------+------------------+
# | Store|count_distinct_sku|
# +------+------------------+
# |629138|                 1|
# |187367|                 0|
# |176129|                 0|
# |633782|                 2|
# +------+------------------+

相关问题