flink cep事件未触发

iq3niunx  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(881)

我已经在flink中实现了cep模式,该模式在连接本地kafka代理时按预期工作。但是当我连接到基于集群的云kafka设置时,flink cep不会触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我用的是ascendingtimestampextractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

我也收到了警告信息,
上升时间戳extractor:140 - 违反时间戳单调性:1594017872227<1594017873133
我还尝试了使用带有周期性水印的赋值器和带有标点水印的赋值器,但没有一个是有效的
我附上Flink控制台截图水印是没有分配。更新的flink控制台屏幕截图
有人能帮忙吗?

olhwl3o2

olhwl3o21#

cep必须首先根据水印对输入流进行排序。所以问题可能出在水印上,但是你还没有给我们足够的证据来调试原因。一个常见的问题是有一个空闲的源,这可以阻止水印前进。
但还有其他可能的原因。为了调试这种情况,我建议您查看一些度量,可以在flinkwebui中查看,也可以在一个度量系统中查看。首先,通过查看 numRecordsIn , numRecordsOut ,或 numRecordsInPerSecond 以及 numRecordsOutPerSecond 在管道的不同阶段。
如果有事件,那么看看 currentOutputWatermark 在工作的不同任务中查看事件时间是否提前。
更新:
看来你可能在打电话 assignTimestampsAndWatermarks 在kafka消费者,这将导致每分区水印。在这种情况下,如果您有一个空闲的分区,该分区将不会产生任何水印,这将保留整个水印。试着打电话 assignTimestampsAndWatermarks 在源代码生成的数据流上,查看是否修复了问题(当然,如果没有逐分区水印,您将无法使用ascendingtimestampextractor,因为流将不有序。)

相关问题