Spark dropDuplicates
保留第一个示例并忽略该键的所有后续示例。是否可以在保留最近发生的事件的同时删除重复项?
例如,如果下面是我得到的微批,那么我想保留每个国家的最新记录(按时间戳字段排序)。
batchid:0
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
批次ID:1
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:03
那么batchid 1之后的输出应该低于-
Australia, 10, 2020-05-05 00:00:08
Belarus, 10, 2020-05-05 00:00:06
这是我的最新代码
//KafkaDF is a streaming dataframe created from Kafka as source
val streamingDF = kafkaDF.dropDuplicates("country")
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.outputMode("update")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
println("batchId: "+ batchId)
batchDF.show()
}
}.start()
我想输出所有行,这些行要么是新的,要么比之前处理的批处理中的任何记录都有更大的时间戳。下面的例子
在batchid:0之后-这两个国家都是第一次出现,所以我应该将它们放入输出中
Australia, 10, 2020-05-05 00:00:06
Belarus, 10, 2020-05-05 00:00:06
batchid:1之后-白俄罗斯的时间戳比我在批处理0中收到的时间戳早,所以我不会在输出中显示它。澳大利亚的时间戳比我目前看到的要新。
Australia, 10, 2020-05-05 00:00:08
现在让我们假设batchid2将两个记录都显示为延迟到达,那么它不应该在该批的输出中显示任何内容。
输入batchid:2
Australia, 10, 2020-05-05 00:00:01
Belarus, 10, 2020-05-05 00:00:01
在batchid:2之后
.
更新-2
为每个批次添加输入和预期记录。用红色标记的行将被丢弃,并且不会在输出中显示为具有相同国家名称的另一行,并且在以前的批处理中可以看到较新的时间戳
2条答案
按热度按时间w80xi6nr1#
为了避免流媒体应用程序中的延迟事件,您需要在应用程序中保持一个状态,即在您的情况下跟踪每个密钥最新处理的事件
country
.对于一个微博客,当你这样做的时候,你可能会收到多个事件
groupByKey(_.country)
你会得到一个属于key(country)
您需要将其与状态进行比较,以找到最新的输入事件,并使用密钥的最新时间戳更新状态,然后继续处理最新的事件。对于迟到事件,应返回Option[Event]
并在后续过程中过滤掉。有关详细说明,请参阅本博客。
btxsgosb2#
尝试使用
window
在spark streaming中的函数,例如检查下面的。您也可以在这个问题中检查相同的问题,解决方案是在python中。