Spark流-各组数据集的时间差

af7jpaap  于 2023-03-03  发布在  Apache
关注(0)|答案(1)|浏览(205)

如何计算同一组消息中第一条消息和最后一条消息的时间差?
输入:

MESSAGE |      TIME 
----------------------------
AAA     | 2019-01-01 14:00:00  # message on kafka topic, 1 message per topic

AAA     | 2019-01-01 14:05:00

AAA     | 2019-01-01 14:10:00

BBB     | 2019-01-01 14:15:00

BBB     | 2019-01-01 14:20:00

AAA     | 2019-01-01 14:25:00

AAA     | 2019-01-01 14:30:00

预期结果:当第一次收到消息时,不做任何事情。2对于第二次,如果连续收到相同的消息,则计算timediff。
在这种情况下,AAA在第2条消息接收期间的初始时间差为5分钟,在接收第3条消息后更新为10分钟

MESSAGE | TIME_DIFF
----------------------
AAA     |  10 minutes
BBB     |   5 minutes
AAA     |   5 minutes

尝试执行以下操作,但无法确定方法。

# Read data from Kafka Topic as Dataframe

df = spark.readStream.format("kafka")

# Initialize for 1st msg 

msg_name = df.msg
inital_timestamp = df.time

# subsequent 2nd  msg onwards
# check if current msg is same as earlier message ie. msg_name

temp_list = []

if df['message'] == msg_name:
    var_compute_diff = df[time] - inital_timestamp
    <write to s3, when to write?>
    <Handling cases where we need to persists AAA t times as shown in output above>
7d7tgy0s

7d7tgy0s1#

Spark流式处理遵循基于微批处理的处理方式,这意味着您的输入流被分成小批,然后Spark以分布式的方式单独处理。当两个微批处理之间没有连接(数据共享)时,它被称为无状态处理。
在您的示例中,可能会出现一条消息在第一批中,而第二条消息在第二批或第三批中的情况,因此在这种情况下,您需要实现有状态处理。
查看此示例以获得更多理解,https://blog.knoldus.com/stateful-streaming-in-spark/

相关问题