我已经在flink中实现了cep模式,该模式在连接本地kafka代理时按预期工作。但是当我连接到基于集群的云kafka设置时,flink cep不会触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我用的是ascendingtimestampextractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});
我也收到了警告信息,
上升时间戳extractor:140 - 违反时间戳单调性:1594017872227<1594017873133
我还尝试了使用带有周期性水印的赋值器和带有标点水印的赋值器,但没有一个是有效的
我附上Flink控制台截图水印是没有分配。更新的flink控制台屏幕截图
有人能帮忙吗?
1条答案
按热度按时间olhwl3o21#
cep必须首先根据水印对输入流进行排序。所以问题可能出在水印上,但是你还没有给我们足够的证据来调试原因。一个常见的问题是有一个空闲的源,这可以阻止水印前进。
但还有其他可能的原因。为了调试这种情况,我建议您查看一些度量,可以在flinkwebui中查看,也可以在一个度量系统中查看。首先,通过查看
numRecordsIn
,numRecordsOut
,或numRecordsInPerSecond
以及numRecordsOutPerSecond
在管道的不同阶段。如果有事件,那么看看
currentOutputWatermark
在工作的不同任务中查看事件时间是否提前。更新:
看来你可能在打电话
assignTimestampsAndWatermarks
在kafka消费者,这将导致每分区水印。在这种情况下,如果您有一个空闲的分区,该分区将不会产生任何水印,这将保留整个水印。试着打电话assignTimestampsAndWatermarks
在源代码生成的数据流上,查看是否修复了问题(当然,如果没有逐分区水印,您将无法使用ascendingtimestampextractor,因为流将不有序。)