流处理的文本书示例是一个带时间戳的字数计算程序。使用以下数据示例
mario 10:00
luigi 10:01
mario 11:00
mario 12:00
我看过在以下几年里制作的字数统计程序:
总数据集
mario 3
luigi 1
一组时间窗口分区
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1
但是,我还没有找到一个滚动时间窗口上的字数计算程序示例,即我希望从时间开始每小时为每个字生成一个字数:
mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1
apache flink或任何其他流处理库都可以这样做吗?谢谢!
编辑:
到目前为止,我尝试了davidanderson方法的一个变体,只改变了事件时间的处理时间,因为数据是timestsamped。但它并没有像我预期的那样起作用。下面是代码、示例数据、它提供的结果以及我的后续问题:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setParallelism(1)
.setMaxParallelism(1);
env.setStreamTimeCharacteristic(EventTime);
String fileLocation = "full file path here";
DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);
rawInput.flatMap(parse())
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 1);
}
@Override
public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
return element.getTimestamp();
}
})
.keyBy(TimestampedWord::getWord)
.process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
private transient ValueState<Long> count;
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
}
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
}
long l = ((value.getTimestamp() / 10) + 1) * 10;
ctx.timerService().registerEventTimeTimer(l);
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
}
})
.addSink(new PrintlnSink());
env.execute();
}
private static long fileCounter = 0;
private static FlatMapFunction<String, TimestampedWord> parse() {
return new FlatMapFunction<String, TimestampedWord>() {
@Override
public void flatMap(String value, Collector<TimestampedWord> out) {
out.collect(new TimestampedWord(value, fileCounter++));
}
};
}
private static class TimestampedWord {
private final String word;
private final long timestamp;
private TimestampedWord(String word, long timestamp) {
this.word = word;
this.timestamp = timestamp;
}
public String getWord() {
return word;
}
public long getTimestamp() {
return timestamp;
}
}
private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
@Override
public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
}
}
一个包含以下单词的文件,每个单词都在一个新行中:
mario,luigi,mario,mario,vilma,fred,bob,bob,mario,dan,dylan,dylan,fred,mario,mario,carl,bambam,summer,anna,anna,edu,anna,anna,anna,anna,anna,anna
生成以下输出:
mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807
显然出了问题。我数到三了 anna
在20岁的时候,即使这个词的第三个示例 anna
直到位置22才出现。奇怪的是 edu
只出现在上一个快照中,即使它以前出现过 anna
第三个案子。即使没有消息到达(即应生成相同的数据),如何触发每10个“时间单位”生成一个快照?
如果有人能给我指出正确的方向,我会非常感激的!
1条答案
按热度按时间ivqmmu1c1#
是的,这不仅可以和Flink一起做,而且很容易。您可以使用keyedprocessfunction来实现这一点,该函数将计数器保持在keyed状态,表示到目前为止每个字/键在输入流中出现的次数。然后使用计时器触发报告。
下面是一个使用处理时间计时器的示例。它每10秒打印一份报告。
更新时间:
使用事件时间总是更好,但这确实增加了复杂性。大多数增加的复杂性来自于这样一个事实,即在实际应用程序中,您很可能必须处理无序事件——在您的示例中,您已经避免了这种情况,因此在本例中,我们可以通过一个相当简单的实现来解决。
如果你改变两件事,你会得到你期望的结果。首先,将水印设置为
extractedTimestamp - 1
是结果错误的原因(例如,这就是为什么anna=3=20)。如果将水印设置为extractedTimestamp
相反,这个问题会消失。说明:正是第三个安娜的到来,创造了在时间20关闭窗口的水印。第三个anna的时间戳是21,因此在流中后跟一个20的水印,它关闭第二个窗口并生成anna=3的报告。是的,第一个edu来得更早,但它是第一个edu,时间戳是20。在edu到达的时候,没有为edu设置计时器,并且创建的计时器被正确地设置为30点启动,所以直到至少30的水印到达,我们才听说edu。
另一个问题是定时器逻辑。flink为每个键创建一个单独的计时器,并且每次启动计时器时都需要创建一个新的计时器。否则,您将只收到有关窗口中到达的单词的报告。您应该修改代码,使其更像这样:
通过这些更改,我得到了以下结果:
现在,如果你真的需要处理无序事件,这会变得更加复杂。有必要使水印滞后于时间戳一个实际的量,反映流中存在的无序的实际量,这就需要能够处理一次打开多个窗口的情况。任何给定的事件/字可能不属于下一个将关闭的窗口,因此不应递增其计数器。例如,您可以将这些“早期”事件缓冲在另一个状态(例如liststate)中,或者以某种方式维护多个计数器(可能在mapstate中)。此外,有些事件可能会延迟,从而使早期的报告无效,您需要定义一些策略来处理这些事件。