public class UnionStreamTimestampExtractor implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentMaxTimestamp;
private boolean hasAlgoRule;
public UnionStreamTimestampExtractor(boolean hasAlgoRule) {
this.hasAlgoRule = hasAlgoRule;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
long timestamp = element.get("occur_time") == null ?
element.getLong("timestamp") : element.getLong("occur_time");
// if this entity has algorithm rule
// let anomaly slice generate watermark, so that CEP can buffer raw log data to wait them.
if (hasAlgoRule) {
if (element.containsKey("tensor")) {
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
}
} else {
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
}
return timestamp;
}
}
1条答案
按热度按时间3vpjnl9f1#
我有个解决办法。
根据文档,元素将被缓冲在cep中等待水印。
所以我决定改变水印生成策略如下。
但是它仍然有一个问题,如果流的速度有很大的不同,元素会很多。