在spark结构化流媒体中,如何基于时间戳字段进行重复数据消除并保持最新?

u4dcyp6a  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(489)

Spark dropDuplicates 保留第一个示例并忽略该键的所有后续示例。是否可以在保留最近发生的事件的同时删除重复项?
例如,如果下面是我得到的微批,那么我想保留每个国家的最新记录(按时间戳字段排序)。
batchid:0

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

批次ID:1

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03

那么batchid 1之后的输出应该低于-

Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06

这是我的最新代码

//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")

streamingDF.writeStream
    .trigger(Trigger.ProcessingTime(10000L))
    .outputMode("update")
    .foreachBatch {
      (batchDF: DataFrame, batchId: Long) => {
        println("batchId: "+ batchId)
        batchDF.show()
      }
    }.start()

我想输出所有行,这些行要么是新的,要么比之前处理的批处理中的任何记录都有更大的时间戳。下面的例子
在batchid:0之后-这两个国家都是第一次出现,所以我应该将它们放入输出中

Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06

batchid:1之后-白俄罗斯的时间戳比我在批处理0中收到的时间戳早,所以我不会在输出中显示它。澳大利亚的时间戳比我目前看到的要新。

Australia, 10, 2020-05-05 00:00:08

现在让我们假设batchid2将两个记录都显示为延迟到达,那么它不应该在该批的输出中显示任何内容。
输入batchid:2

Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01

在batchid:2之后

.

更新-2
为每个批次添加输入和预期记录。用红色标记的行将被丢弃,并且不会在输出中显示为具有相同国家名称的另一行,并且在以前的批处理中可以看到较新的时间戳

w80xi6nr

w80xi6nr1#

为了避免流媒体应用程序中的延迟事件,您需要在应用程序中保持一个状态,即在您的情况下跟踪每个密钥最新处理的事件 country .

case class AppState(country:String, latestTs:java.sql.Timestamp)

对于一个微博客,当你这样做的时候,你可能会收到多个事件 groupByKey(_.country) 你会得到一个属于 key(country) 您需要将其与状态进行比较,以找到最新的输入事件,并使用密钥的最新时间戳更新状态,然后继续处理最新的事件。对于迟到事件,应返回 Option[Event] 并在后续过程中过滤掉。
有关详细说明,请参阅本博客。

btxsgosb

btxsgosb2#

尝试使用 window 在spark streaming中的函数,例如检查下面的。

val columns = Seq("country","id").map(col(_))
df.groupBy(window($"timestamp","10 minutes","5 minutes"), columns:_*)

您也可以在这个问题中检查相同的问题,解决方案是在python中。

相关问题