带水印的spark结构化流式重复数据消除

pieyvz9o  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(327)

我正在运行结构化的流媒体管道,使用hdfs作为源,hive作为接收器。我使用的spark版本是spark2.4.0-cdh6.2.1。我需要在5分钟的滚动/静态窗口大小内消除流式Dataframe中的重复数据。但是,我的Dataframe不包含任何事件时间列。我为实现预期结果而编写的代码是-

String eventTimeWatermarkingColumn = "custom_eventTime";

Dataset<Row> intermediateDSWithWatermarkingColumn = inputFileDS.withColumn(eventTimeWatermarkingColumn,
                functions.current_timestamp());

Dataset<Row> intermediateDS = intermediateDSWithWatermarkingColumn
                .withWatermark(eventTimeWatermarkingColumn, "5 minutes")
                .dropDuplicates(new String[]{"colA","colB","colC"});

在代码中,我在dataframe中添加了一个自定义timestamp列,该列的值为current timestamp,并将其用作withwatermark方法的参数。
期望的行为:使用上面的代码,我的期望是每5分钟的窗口帧,应该删除重复的记录(如果有的话),一旦5分钟的窗口时间过去,应该允许任何重复的记录。确切地说,我想水印应该使存储状态的数据每5分钟窗口时间只有一次,一旦5分钟过去之前的状态存储数据应该被删除。
应用程序行为:然而,应用程序并没有按预期工作,相反,水印并没有起任何作用,在5分钟的窗口帧内或超过5分钟的窗口帧内插入的每个重复记录都会被应用程序删除。我无法理解这种行为。这是危险的,因为这可能会导致对象在某个时间点之后出现内存不足错误。
发生这种情况是因为我们引入了自定义时间列吗?如果是这样的话,如何将自定义eventtime列添加到dataframe(在模式中没有eventtime列)以使水印正常工作?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题