使用pyspark根据条件和大数据计算时差

2izufjch  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(528)

请帮帮我…我有这样的数据:

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import substring, length
dept = [("A",1,"2020-11-07 23:19:12"), ("A",1,"2020-11-07 23:19:16"), ("A",1,"2020-11-07 23:19:56"), ("A",0,"2020-11-07 23:20:37"), ("A",0,"2020-11-07 23:21:06"), ("A",0,"2020-11-07 23:21:47"), ("A",1,"2020-11-07 23:22:05"), ("A",1,"2020-11-07 23:22:30"),("A",1,"2020-11-07 23:23:00"), ("B",1,"2020-11-07 22:19:12"), ("B",1,"2020-11-07 22:20:10"), ("B",0,"2020-11-07 22:21:31"), ("B",0,"2020-11-07 22:22:01"), ("B",0,"2020-11-07 22:22:45"), ("B",1,"2020-11-07 22:23:52"), ("B",1,"2020-11-07 22:24:10")]
deptColumns = ["Id","BAP","Time"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()


使用pyspark,如何获取每个id的第一个零序列中的第一个0的时间,并在相同的零序列之后获取第一个1的时间。然后在这两个序列之间进行时间戳相减。像这样:

必须对构成每个id的每个零序列执行此操作。因此,如果有多个零序列,则对于同一id可以有多个deltatime。
实际上,我可以计算连续行之间的增量时间:

Delta=deptDF.withColumn("DeltaTime",(deptDF.Time.cast("bigint") - lag(deptDF.Time.cast("bigint"),1).over(Window.partitionBy("Id").orderBy("Time")).cast("bigint")))
Delta.show()

是否可以添加任何条件以获得预期结果?

ttygqcqt

ttygqcqt1#

添加两列begin0和begin1,以帮助使用窗口函数解析数据:

import pyspark.sql.functions as F

window = Window.partitionBy('Id').orderBy('Time')
Delta = deptDF.withColumn(
    'begin0',
    (F.lag('BAP').over(window) != 0) & (F.col('BAP') == 0)
).withColumn(
    'begin1',
    (F.lag('BAP').over(window) == 0) & (F.col('BAP') == 1)
).filter(
    'begin0 or begin1'
).withColumn(
    'DeltaTime',
    F.when(
        F.col('BAP') == 0,
        F.date_format(
            (
                F.lead('Time').over(window).cast('timestamp').cast('bigint') -
                F.col('Time').cast('timestamp').cast('bigint')
            ).cast('timestamp'),
           'HH:mm:ss'
       )
    ).otherwise(
        F.lit('00:00:00')
    )
).drop(
    'begin0', 'begin1'
).orderBy(
    'Id','Time'
)

Delta.show()
+---+---+-------------------+---------+
| Id|BAP|               Time|DeltaTime|
+---+---+-------------------+---------+
|  A|  0|2020-11-07 23:20:37| 00:01:28|
|  A|  1|2020-11-07 23:22:05| 00:00:00|
|  B|  0|2020-11-07 22:21:31| 00:02:21|
|  B|  1|2020-11-07 22:23:52| 00:00:00|
+---+---+-------------------+---------+

相关问题