pyspark-conditional用groupby创建列

bwitn5fc  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(373)

我需要使用第二个pysparkDataframe为每个id创建一个基于日期条件的指示符。
指示灯为1或0,表示传感器出现故障。指示器以第二个Dataframe为条件,该Dataframe具有第一个故障日期和最后一个故障日期。如果故障记录在 fail_df , main_df 行中第一个和最后一个记录的故障之间应该有一个1 fail_df . 当传感器没有在 main_df 日期,它应该存储值0。
主Dataframe

ID         |  Date      |Value 
-------------------------------------------------
P1         | 2016-10-01 |100
P1         | 2016-10-02 |200
P1         | 2016-12-16 |700
P1         | 2016-12-17 |800
P1         | 2016-12-18 |800
P2         | 2016-01-31 |700
P2         | 2016-02-01 |800
P2         | 2016-02-02 |900

故障列表Dataframe

ID         |  First Fail Date      | Last Fail Date
-----------------------------------------------------
P1         | 2016-10-01            |2016-10-02  |
P2         | 2016-01-31            |2016-02-01  |

所需Dataframe

ID         |  Date      |Value  | Failure_Indicator     
-------------------------------------------------
P1         | 2016-10-01 |100    | 1
P1         | 2016-10-02 |200    | 1
P1         | 2016-12-16 |700    | 0
P1         | 2016-12-17 |800    | 0
P1         | 2016-12-18 |800    | 0
P2         | 2016-01-31 |700    | 1
P2         | 2016-02-01 |800    | 1
P2         | 2016-02-02 |900    | 0

我尝试了:attributeerror:'groupeddata'对象没有属性'withcolumn'

df = main_df.groupBy('ID').withColumn(
    'Failure_Indicator',
    F.when((fail_df.col("First fail Date") >= main_df.Date) & 
           (fail_df.col("Last fail Date") >= main_df.Date), 1)
     .otherwise(0))
nkkqxpd9

nkkqxpd91#

df_1=spark.createDataFrame([("P1", "2016-10-01", 100), ("P1", "2016-10-03", 200), ("P3", "2016-10-09", 200)], ["id", "date", "value"])

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
| P1|2016-10-01|  100|
| P1|2016-10-03|  200|
| P3|2016-10-09|  200|
+---+----------+-----+

df_2=spark.createDataFrame([("P1", "2016-10-01", "2016-10-02")], ["id", "start_date", "end_date"])

+---+----------+----------+
| id|start_date|  end_date|
+---+----------+----------+
| P1|2016-10-01|2016-10-02|
+---+----------+----------+

df_1.join(df_2, 'id', 'left_outer') \
    .withColumn('failure_indicator', when((col("date") >= col("start_date")) & (col("date") <= col("end_date")), 1).otherwise(0)) \
    .select('id', 'date', 'value', 'failure_indicator') \
    .show()

+---+----------+-----+-----------------+
| id|      date|value|failure_indicator|
+---+----------+-----+-----------------+
| P3|2016-10-09|  200|                0|
| P1|2016-10-01|  100|                1|
| P1|2016-10-03|  200|                0|
+---+----------+-----+-----------------+

相关问题