如何在pyspark Dataframe 中包含使用条件的标志

8xiog9wr  于 2022-12-03  发布在  Spark
关注(0)|答案(1)|浏览(145)

我有一个 Dataframe ,如下所示df:

id  vehicle production  asIs    EU    EU_variant    status  
   1    A3345               PQ1298  FV1   FV1_variant   OK  
   2    A3346   A3346       PQ1287  FV2   FV2_variant   NOT_OK
   3    A3346   A3346       PQ1207  FV2   FV2_variant   NOT_OK
   4            A3347               QP9   QP9_variant   OK  
   5            A3347               QP9   QP9_variant   NOT_OK
   6            A3347               QP3   QP3_variant   OK  
   7            A3348       MP6553  YR34  YR34_variant  NOT_OK  
   8            A3348       MP6554  YR35  YR35_variant  NOT_OK  
   9            A3348       MP6554  YR35  YR35_variant  NOT_OK

对于每个不同的车辆和不同的EU,我需要创建2列“标志”和“零件”,其中如果它的状态既有正常状态又有不正常状态,则标志将为0,否则如果它只有不正常状态,则标志将为1。对于每个不同的车辆和不同的EU,我需要将asIs合并,而不重复。如果车辆编号不存在,则应检查不同的生产和不同的EU
输出应

id  vehicle production  asIs    EU     EU_variant   status  Flag    Part
   1    A3345               PQ1298  FV1    FV1_variant  OK      0       PQ1298
   2    A3346   A3346       PQ1287  FV2    FV2_variant  NOT_OK  1       PQ1287,PQ1207
   3    A3346   A3346       PQ1207  FV2    FV2_variant  NOT_OK  1       PQ1287,PQ1207
   4            A3347               QP9    QP9_variant  OK      0   
   5            A3347               QP9    QP9_variant  NOT_OK  0   
   6            A3347               QP3    QP3_variant  OK      0   
   7            A3348       MP6553  YR34   YR34_variant NOT_OK  1       MP6553
   8        -   A3348       MP6554  YR35   YR35_variant NOT_OK  1       MP6554
   9            A3348       MP6554  YR35   YR35_variant NOT_OK  1       MP6554

如何使用pyspark dataframe实现此场景

zbq4xfa0

zbq4xfa01#

您可以在status字段上使用collect_set来获取所需分区的不同状态。使用结果来标记记录。collect_set返回一个数组字段,可用于检查长度(使用size)及其内容(使用array_contains)。
参见下面示例

data_sdf. \
    withColumn('vehicle_prod', func.coalesce('vehicle', 'production')). \
    withColumn('vehicle_prod_eu', 
               func.collect_set('status').over(wd.partitionBy('vehicle_prod', 'eu').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize))
               ). \
    withColumn('flag', 
               ((func.size('vehicle_prod_eu') == 1) & 
                (func.array_contains('vehicle_prod_eu', 'NOT_OK'))).cast('int')
               ). \
    withColumn('part', 
               func.collect_set('asis').over(wd.partitionBy('vehicle_prod', 'eu').orderBy('id').rowsBetween(-sys.maxsize, sys.maxsize))
               ). \
    withColumn('part', func.concat_ws(',', 'part')). \
    orderBy('id'). \
    show()

# +---+-------+----------+------+----+------------+------+------------+---------------+----+-------------+
# | id|vehicle|production|  asis|  eu|  eu_variant|status|vehicle_prod|vehicle_prod_eu|flag|         part|
# +---+-------+----------+------+----+------------+------+------------+---------------+----+-------------+
# |  1|  A3345|      null|PQ1298| FV1| FV1_variant|    OK|       A3345|           [OK]|   0|       PQ1298|
# |  2|  A3346|     A3346|PQ1287| FV2| FV2_variant|NOT_OK|       A3346|       [NOT_OK]|   1|PQ1287,PQ1207|
# |  3|  A3346|     A3346|PQ1207| FV2| FV2_variant|NOT_OK|       A3346|       [NOT_OK]|   1|PQ1287,PQ1207|
# |  4|   null|     A3347|  null| QP9| QP9_variant|    OK|       A3347|   [NOT_OK, OK]|   0|             |
# |  5|   null|     A3347|  null| QP9| QP9_variant|NOT_OK|       A3347|   [NOT_OK, OK]|   0|             |
# |  6|   null|     A3347|  null| QP3| QP3_variant|    OK|       A3347|           [OK]|   0|             |
# |  7|   null|     A3348|MP6553|YR34|YR34_variant|NOT_OK|       A3348|       [NOT_OK]|   1|       MP6553|
# |  8|   null|     A3348|MP6554|YR35|YR35_variant|NOT_OK|       A3348|       [NOT_OK]|   1|       MP6554|
# |  9|   null|     A3348|MP6554|YR35|YR35_variant|NOT_OK|       A3348|       [NOT_OK]|   1|       MP6554|
# +---+-------+----------+------+----+------------+------+------------+---------------+----+-------------+

相关问题