我是Apache·Flink的新手。我的输入中有一个未绑定的数据流(通过kakfa输入到flink0.10)。
我想得到每个主键的第一次出现(主键是contract\ num和event\ dt)。
这些“复制品”几乎紧随其后。源系统不能为我过滤这个,所以flink必须这么做。
这是我的输入数据:
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
以下是我想要的输出数据:
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
注:第2行已删除,因为第1行中已出现a001和“2016-02-24 10:25:08”的键组合。
我怎么能用flink 0.10做到这一点?
我在考虑用 keyBy(0,1)
但在那之后我不知道该怎么办!
(我使用了joda time和org.flinkspett来设置这些测试)。
@Test
public void test() {
DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);
DataStream<Tuple3<String, Date, String>> testStream =
createTimedTestStreamWith(
Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
.emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
.close();
testStream.keyBy(0,1);
}
2条答案
按热度按时间6tdlim6h1#
这是我刚刚写的另一种方法。它的缺点是它是一个更多的自定义代码,因为它不使用内置的flink窗口函数,但它没有延迟惩罚,直到提到。github上的完整示例。
lrl1mhuk2#
如果密钥空间大于可用存储空间,则在无限流上过滤重复项最终将失败。原因是您必须将已经看到的密钥存储在某个位置以过滤掉重复的密钥。因此,最好定义一个时间窗口,在这个时间窗口之后,您可以清除当前看到的一组关键点。
如果您知道这个问题,但无论如何都想尝试,可以通过应用有状态
flatMap
手术后keyBy
打电话。有状态Map器使用flink的状态抽象来存储它是否已经看到具有这个键的元素。这样,您还将受益于flink的容错机制,因为您的状态将被自动检查。一个flink程序做你的工作
在哪里实施
DuplicateFilter
取决于flink的版本。版本>=1.0实施
版本0.10实现
更新:使用翻滚时间窗口