如何使用state实现自定义触发器的保存点?

wbgh16ku  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(376)

我已经实现了自定义 WindowAssigher :

public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> {
    @Override
    public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) {
        return Collections.singletonList(new SessionWindow(element.getSessionUid()));
    }

    @Override
    public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new SessionTrigger(60_000L);
    }

    @Override
    public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new SessionWindow.Serializer();
    }
}

, Window :

public class SessionWindow extends Window {
    private final String sessionUid;

    public SessionWindow(String sessionUid) {
        this.sessionUid = sessionUid;
    }

    public String getSessionUid() {
        return sessionUid;
    }

    @Override
    public long maxTimestamp() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SessionWindow that = (SessionWindow) o;

        return sessionUid.equals(that.sessionUid);
    }

    @Override
    public int hashCode() {
        return sessionUid.hashCode();
    }

    public static class Serializer extends TypeSerializer<SessionWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public boolean isImmutableType() {
            return true;
        }

        @Override
        public TypeSerializer<SessionWindow> duplicate() {
            return this;
        }

        @Override
        public SessionWindow createInstance() {
            return null;
        }

        @Override
        public SessionWindow copy(SessionWindow from) {
            return from;
        }

        @Override
        public SessionWindow copy(SessionWindow from, SessionWindow reuse) {
            return from;
        }

        @Override
        public int getLength() {
            return 0;
        }

        @Override
        public void serialize(SessionWindow record, DataOutputView target) throws IOException {
            target.writeUTF(record.sessionUid);
        }

        @Override
        public SessionWindow deserialize(DataInputView source) throws IOException {
            return new SessionWindow(source.readUTF());
        }

        @Override
        public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException {
            return new SessionWindow(source.readUTF());
        }

        @Override
        public void copy(DataInputView source, DataOutputView target) throws IOException {
            target.writeUTF(source.readUTF());
        }

        @Override
        public boolean equals(Object obj) {
            return obj instanceof Serializer;
        }

        @Override
        public boolean canEqual(Object obj) {
            return obj instanceof Serializer;
        }

        @Override
        public int hashCode() {
            return 0;
        }
    }
}

以及 Trigger :

public class SessionTrigger extends Trigger<LogItem, SessionWindow> {
    private final long sessionTimeout;

    private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null);

    public SessionTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception {
        ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);

        Long previousFinishTimestamp = previousFinishTimestampState.value();
        Long newFinisTimestamp = timestamp + sessionTimeout;

        if (previousFinishTimestamp != null) {
            ctx.deleteEventTimeTimer(previousFinishTimestamp);
        }

        ctx.registerEventTimeTimer(newFinisTimestamp);

        previousFinishTimestampState.update(newFinisTimestamp);

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
        throw new UnsupportedOperationException("This is not processing time trigger");
    }

    @Override
    public void clear(SessionWindow window, TriggerContext ctx) throws Exception {
        ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);

        Long previousFinishTimestamp = previousFinishTimestampState.value();

        ctx.deleteEventTimeTimer(previousFinishTimestamp);

        previousFinishTimestampState.clear();
    }
}

对于按超时检测会话结束,即,如果最后一个事件是n秒前,则评估窗口函数。如您所见,我将最后一个事件时间戳保存在valuestate中,因为我希望在失败后恢复它。
看来我应该实施 Checkpointed 用于在此触发器中保存/还原保存点(和检查点)快照的接口,因为我不希望在重新部署流期间释放触发器状态。
那么,有人能告诉我如何保存 SessionTrigger 在部署期间是否正确触发(可能还有相关的窗口)?
据我所知,我应该 Checkpointed 的接口 SessionTrigger 因为只有它才有状态。正确的?怎么样 SessionWindow -s和 SessionWindowAssigner ? 是在部署后自动恢复还是手动恢复?

hmtdttj4

hmtdttj41#

取自会话窗口

private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {

    private static final long serialVersionUID = 1L;

    private final Long sessionTimeout;

    private final ValueStateDescriptor<Long> stateDesc = 
            new ValueStateDescriptor<>("last-seen", Long.class, -1L);

    public SessionTrigger(Long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;

    }

    @Override
    public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

        ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
        Long lastSeen = lastSeenState.value();

        Long timeSinceLastEvent = timestamp - lastSeen;

        ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);

        // Update the last seen event time
        lastSeenState.update(timestamp);

        ctx.registerEventTimeTimer(timestamp + sessionTimeout);

        if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
            System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
            return TriggerResult.FIRE_AND_PURGE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
        Long lastSeen = lastSeenState.value();

        if (time - lastSeen >= sessionTimeout) {
            System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
        ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
        if (lastSeenState.value() != -1) {
            ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
        }
        lastSeenState.clear();
    }
}

相关问题