在我的管道设置中,我看不到会话窗口的侧输出。我用的是flink 1.9.1
版本1。我得到的是:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
latetrademessages实现sessionwindowtimegapextractor并返回5秒。
此外,我还有:
messageStream.getSideOutput(lateTradeMessages)
.keyBy(tradeKeySelector)
.process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
@Override
public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
System.out.println("Process Late messages For Aggregation");
out.collect(new Transaction());
}
})
.name("Process Late messages For Aggregation");
问题是,当我用同一个键发送消息时,我从来没有看到“为聚合处理延迟的消息”,这会错过窗口时间。
当会话窗口通过时,我“立即”为同一个键发送了一条新消息,它会触发新的会话窗口,而不会进入latesideoutput。
不知道我做错了什么。
我想在这里实现的,是抓住“迟来的事件”,并尝试重新处理它们。
我将感谢任何帮助。
版本2,在@dominik wosi之后ń滑雪评论:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false), properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
水印在进步,我查了flink的指标。窗口操作符正在执行,但仍然没有延迟输出。
顺便说一句,Kafka的主题可以是空闲的,所以我必须定期发出新的水印。
2条答案
按热度按时间czq61nw11#
水印方法在我看来非常可疑。通常,此时会输出最新的事件时间戳。
只是一些背景资料,这样更容易理解。
延迟事件指的是在水印处理到事件之后的某个时间之后发生的事件。考虑以下示例:
您的水印方法将几乎把所有过去的事件都呈现为后期事件(由于1s水印间隔,所以有一点容忍度)。这也会使再加工和回收变得不可能。
然而,你实际上没有看到任何更让我吃惊的事情。你能仔细检查你的水印方法,描述你的用例,并提供示例数据吗?通常情况下,实现对于实际用例并不理想,应该以不同的方式来解决。
qf9go6mv2#
您正在使用
ProcessingTime
在您的例子中,这意味着系统时间用于测量系统中的时间流DataStream
.对于每个事件,分配给该事件的时间戳是您在flink管道中接收数据的时刻。这意味着在flink处理时间内没有办法使事件无序。正因为如此,你的窗口永远不会有最新的元素。
如果你换成
EventTime
,则对于正确的输入数据,您应该能够看到传递到输出端的后期元素。您可能应该看看文档,其中有flink解释的各种时间概念。