如何协调Flink河之间的速度?

eimct9ow  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(258)

我有一个原始的日志流和一个算法流,如下所示。

如上所示,由于算法有窗口,产生结果需要很长时间,原始日志流运行过快,导致算法流的输出被丢弃,因为它远远落后于水印。
谢谢你的建议!

3vpjnl9f

3vpjnl9f1#

我有个解决办法。
根据文档,元素将被缓冲在cep中等待水印。
所以我决定改变水印生成策略如下。

public class UnionStreamTimestampExtractor implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentMaxTimestamp;
private boolean hasAlgoRule;

public UnionStreamTimestampExtractor(boolean hasAlgoRule) {
    this.hasAlgoRule = hasAlgoRule;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
    return new Watermark(currentMaxTimestamp);
}

@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
    long timestamp = element.get("occur_time") == null ?
            element.getLong("timestamp") : element.getLong("occur_time");
    // if this entity has algorithm rule
    // let anomaly slice generate watermark, so that CEP can buffer raw log data to wait them.
    if (hasAlgoRule) {
        if (element.containsKey("tensor")) {
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        }
    } else {
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    }

    return timestamp;
}
}

但是它仍然有一个问题,如果流的速度有很大的不同,元素会很多。

相关问题