如何计算同一组消息中第一条消息和最后一条消息的时间差?
输入:
MESSAGE | TIME
----------------------------
AAA | 2019-01-01 14:00:00 # message on kafka topic, 1 message per topic
AAA | 2019-01-01 14:05:00
AAA | 2019-01-01 14:10:00
BBB | 2019-01-01 14:15:00
BBB | 2019-01-01 14:20:00
AAA | 2019-01-01 14:25:00
AAA | 2019-01-01 14:30:00
预期结果:当第一次收到消息时,不做任何事情。2对于第二次,如果连续收到相同的消息,则计算timediff。
在这种情况下,AAA在第2条消息接收期间的初始时间差为5分钟,在接收第3条消息后更新为10分钟
MESSAGE | TIME_DIFF
----------------------
AAA | 10 minutes
BBB | 5 minutes
AAA | 5 minutes
尝试执行以下操作,但无法确定方法。
# Read data from Kafka Topic as Dataframe
df = spark.readStream.format("kafka")
# Initialize for 1st msg
msg_name = df.msg
inital_timestamp = df.time
# subsequent 2nd msg onwards
# check if current msg is same as earlier message ie. msg_name
temp_list = []
if df['message'] == msg_name:
var_compute_diff = df[time] - inital_timestamp
<write to s3, when to write?>
<Handling cases where we need to persists AAA t times as shown in output above>
1条答案
按热度按时间7d7tgy0s1#
Spark流式处理遵循基于微批处理的处理方式,这意味着您的输入流被分成小批,然后Spark以分布式的方式单独处理。当两个微批处理之间没有连接(数据共享)时,它被称为无状态处理。
在您的示例中,可能会出现一条消息在第一批中,而第二条消息在第二批或第三批中的情况,因此在这种情况下,您需要实现有状态处理。
查看此示例以获得更多理解,https://blog.knoldus.com/stateful-streaming-in-spark/