Pyspark代码:-使用pyspark代码,我想要下面的结果

envsm3lx  于 2023-03-28  发布在  Spark
关注(0)|答案(3)|浏览(140)

我有一个这样的表,里面有上百万条记录。ID是唯一的。类型包含三个值- U,X,I。
| ID|类型|.价值。|日期|
| --------------|--------------|--------------|--------------|
| 1|U|二百二十|2020-06-12 2020-06-12 2020-06-12|
| 二|X|一百二十|2023年1月31日|
| 三|我|二百|2019-01-10 2019-01-10|
| 四|X|一百五十|二〇二二年十月二十九日|
| 五|U|一百|2022年5月12日|
| 六|我|八十|二○二三年三月五日|
我需要输出的方式,它应该只有条目的类型-“我”的最新日期和它的值。
| 身份证。|打字。|价值。|约会。|
| --------------|--------------|--------------|--------------|
| 六|我|八十|二○二三年三月五日|
如果表中没有类型为“I”的条目,则应查找类型为“X”的条目,并且输出应包含最新日期及其值。
| 身份证。|打字。|价值。|约会。|
| --------------|--------------|--------------|--------------|
| 二|X|一百二十|2023年1月31日|
如果表中没有类型为“I”和“X”的条目,则应查找类型为“U”的条目,并且输出应包含最新日期及其值。
| 身份证。|打字。|价值。|约会。|
| --------------|--------------|--------------|--------------|
| 五|U|一百|2022年5月12日|

agxfikkp

agxfikkp1#

首先,您应该添加一个值来对字母进行排序。然后,您可以为letter_iddate添加排名。最后,您保留第一个记录。

w = Window.partitionBy().orderBy("letter_id", desc("Date."))

df2 = (
    df
    .withColumn("letter_id", 
        when(col("Type.") == "I", 1)
       .when(col("Type.") == "X", 2)
       .otherwise(3)
    )
    .withColumn("rank", row_number().over(w))
    .filter(col("rank") == 1)
    .drop("rank", "letter_id")
)

希望能有所帮助,
尼古拉

fumotvh3

fumotvh32#

参见下面的实现-

输入数据

data = [(1, 'U', 220, '2020-06-12'),
        (2, 'X', 120, '2023-01-31'),
        (3, 'I', 200, '2019-01-10'),
        (4, 'X', 150, '2022-10-29'),
        (5, 'U', 100, '2022-05-12'),
        (6, 'I', 80, '2023-03-05')]
df = spark.createDataFrame(data, ['ID', 'Type', 'Value', 'Date'])
df.show()

+---+----+-----+----------+
| ID|Type|Value|      Date|
+---+----+-----+----------+
|  1|   U|  220|2020-06-12|
|  2|   X|  120|2023-01-31|
|  3|   I|  200|2019-01-10|
|  4|   X|  150|2022-10-29|
|  5|   U|  100|2022-05-12|
|  6|   I|   80|2023-03-05|
+---+----+-----+----------+

现在,可以遵循以下步骤-
1.定义一个窗口函数,按类型分区并按日期排序。
1.使用窗口函数获取每个类型的最新行。
1.筛选类型“I”的最新行。
3(a).如果没有类型'I'行,则过滤类型'X'的最新行
3(B).如果没有类型'X'行,则过滤类型'U'的最新行
更多详情见下文-

from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Creating window
window = Window.partitionBy('Type').orderBy(desc('Date'))

# Fetching the latest row
latest_rows = df.select('ID', 'Type', 'Value', 'Date', first('Date').over(window).alias('latest_date')) \
                .filter('Date = latest_date') \
                .drop('latest_date')

# Filter for Type 'I'
i_df = latest_rows.filter('Type = "I"')

# If no Type 'I' rows, filter for Type 'X'
if i_df.count() == 0:
    x_df = latest_rows.filter('Type = "X"')
    # If no Type 'X' rows, filter  for Type 'U'
    if x_df.count() == 0:
        u_df = latest_rows.filter('Type = "U"')
        result = u_df
    else:
        result = x_df
else:
    result = i_df

result.show()

+---+----+-----+----------+
| ID|Type|Value|      Date|
+---+----+-----+----------+
|  6|   I|   80|2023-03-05|
+---+----+-----+----------+
w1jd8yoj

w1jd8yoj3#

如果您的表可能有一些ID值,其中包含多行,并且您需要为每个ID选择一行(与您的示例框架不同),那么您可以使用窗口/按顺序分区方法,如下所示

from pyspark.sql import functions as F
from pyspark.sql.window import Window

win = Window.partitionBy("ID").orderBy("oType",F.desc("Date"))
(
    df
    .withColumn("oType",F.when(F.col("Type")=="U", F.lit("Z")).otherwise(F.col("Type")))
    .withColumn("row",F.row_number().over(win))
    .filter(F.col("row")==1)
    .drop("oType", "row")
)

相关问题