pyspark 当条件第一次出现时标记为True,如果连续重复则标记为False

vktxenjb  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(100)

我试图通过将“fault_start”列标记为“True”来解决一个问题,即我需要在错误代码首次出现时对其行进行计数。如果它们连续重复,没有任何零出现,则将其标记为“假”。如果出现零,则表示故障已清除,如果再次出现相同故障,则将其标记为“True”。
我已经包括了下面的数据样本。请注意,“rn”列只是行号,如果需要可以忽略。此外,“fault_start”是滞后一行的“fault_code”列。

truck_fault_counts_1hz_df = 
Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  3
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  244
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  0
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  49
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  3

下面的解决方案对所有不为零的故障进行计数,但无法不对连续重复发生的故障进行计数:

# Initialize a list of numbers to not be included in the distinct_faults list
x = [0.0]
# Select the distinct faults in the fault_code column and convert it to Pandas
distinct_faults_df = truck_fault_counts_1hz_df.dropDuplicates(["fault_code"]).select("fault_code").toPandas()["fault_code"]
# Create a list of the distinct faults in the fault_code column and convert the values to ints
distinct_faults = [int(i) for i in distinct_faults_df if i not in x]
# Sort the list in ascending order
distinct_faults.sort()
# Convert the values to strings
distinct_faults = [str(i) for i in distinct_faults]

# Convert the fault_code and fault_start column to int then to string from float
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_code",truck_fault_counts_1hz_df.fault_code.cast('string'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('int'))
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.withColumn("fault_start",truck_fault_counts_1hz_df.fault_start.cast('string'))

# Fill any Null values with "0"
truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.fillna(value="0", subset=["fault_start"])

for i in distinct_faults:
    # Create a column for row count and fault start
    # fault_start lags fault_code by one row
    # Any row that has a number greater than 0 in fault_code
    truck_fault_counts_1hz_df = truck_fault_counts_1hz_df.\
        withColumn("fault_start",\
        when(((col("fault_code") == i) & (col("fault_start") != i)), 
        lit("True")).\
        when(((col("fault_code") == 0) & (col("fault_start") == 0)), 
        lit("False")).\
        when(((col("fault_code") == i) & (col("fault_start") == i)), 
        lit("False")).\
        when(((col("fault_code") == 0) & (col("fault_start") == i)), 
        lit("False")).\
        otherwise(truck_fault_counts_1hz_df.fault_start))

truck_fault_counts_1hz_noSpam_df = truck_fault_counts_1hz_df.where(truck_fault_counts_1hz_df.fault_start == True).drop("rn")

display(truck_fault_counts_1hz_df)

我目前的输出:

Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  True

我希望实现的输出:

Truck                       Timestamp                       fault_code  rn  fault_start
251_PS1_PB_520_REF_111199   2023-07-18T00:01:02.739+0000    3           63  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:03.739+0000    244         64  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:04.739+0000    3           65  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:05.739+0000    244         66  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:06.739+0000    3           67  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:07.739+0000    244         68  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:08.739+0000    3           69  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:09.739+0000    244         70  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:10.739+0000    3           71  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:11.739+0000    244         72  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:12.741+0000    3           73  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:13.741+0000    244         74  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:14.839+0000    49          75  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:15.839+0000    0           76  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:16.840+0000    49          77  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:17.840+0000    0           78  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:18.840+0000    49          79  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:19.840+0000    0           80  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:20.840+0000    49          81  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:21.840+0000    0           82  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:22.840+0000    49          83  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:23.840+0000    0           84  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:24.840+0000    49          85  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:25.841+0000    0           86  False
251_PS1_PB_520_REF_111199   2023-07-18T00:01:26.842+0000    49          87  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:27.940+0000    3           88  True
251_PS1_PB_520_REF_111199   2023-07-18T00:01:28.940+0000    244         89  True

我也一直在尝试实现一个字典,其中错误代码作为键,错误开始(True/False)作为值来跟踪它们已经开始,但还没有找到一种实用的方法来实现,不涉及迭代每一行的时间和内存密集型操作(这个数据集包含1900万行,所以这是不实际的)。我还考虑过为每个新的故障代码创建一个新的列,但也没有找到实现它的方法。
任何帮助都非常感谢!!

wqsoz72f

wqsoz72f1#

代码

W1 = Window.partitionBy('Truck').orderBy('Timestamp')
df = df.withColumn('reset_cnt', F.sum((F.col('fault_code') == 0).cast('int')).over(W1))

W2 = Window.partitionBy('Truck', 'reset_cnt', 'fault_code').orderBy('Timestamp')
df = df.withColumn('fault_start', F.row_number().over(W2) == 1)
df = df.withColumn('fault_start', F.expr("IF(fault_code = 0, false, fault_start)"))

如何工作

Truck对该矩阵进行分区,并按Timestamp排序,然后计算fault_code等于0时的条件下的累积和。这将创建一个名为reset_cnt的列,这将帮助我们区分fault_code被重置的不同行块,即清除故障。

+--------------------+--------------------+----------+---+---------+
|               Truck|           Timestamp|fault_code| rn|reset_cnt|
+--------------------+--------------------+----------+---+---------+
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|
+--------------------+--------------------+----------+---+---------+

创建fault_start列:按Truckreset_cntfault_code对该框架进行分区,并分配行号以标识每个分区的重复fault_codes

+--------------------+--------------------+----------+---+---------+-----------+
|               Truck|           Timestamp|fault_code| rn|reset_cnt|fault_start|
+--------------------+--------------------+----------+---+---------+-----------+
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|       true|
+--------------------+--------------------+----------+---+---------+-----------+

使用false屏蔽fault_start中的行,其中fault_code0

+--------------------+--------------------+----------+---+---------+-----------+
|               Truck|           Timestamp|fault_code| rn|reset_cnt|fault_start|
+--------------------+--------------------+----------+---+---------+-----------+
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 63|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 64|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 65|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 66|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 67|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 68|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 69|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 70|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 71|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 72|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 73|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 74|        0|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 75|        0|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 76|        1|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 77|        1|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 78|        2|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 79|        2|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 80|        3|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 81|        3|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 82|        4|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 83|        4|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 84|        5|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 85|        5|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         0| 86|        6|      false|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|        49| 87|        6|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|         3| 88|        6|       true|
|251_PS1_PB_520_RE...|2023-07-18T00:01:...|       244| 89|        6|       true|
+--------------------+--------------------+----------+---+---------+-----------+

相关问题