会话windows flink

jm81lzqq  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(279)

有人能帮我理解什么时候,怎样在Flink窗口(会议)发生?或者样本是如何处理的?
例如,如果我有一个连续的事件流流入,事件就是应用程序中的请求和应用程序提供的响应。作为flink处理的一部分,我们需要了解服务请求需要多少时间。
我知道有时间滚动窗口,每n秒触发一次,一旦时间过去,那么所有事件在该时间窗口将被聚合。
例如:假设定义的时间窗口是30秒,如果一个事件到达t时间,另一个事件到达t+30,那么这两个事件都将被处理,但是到达t+31的事件将被忽略。
如果我说的不正确,请改正。
上面的问题是:如果说一个事件在t时间到达,另一个事件在t+3时间到达,它还会等待整个30秒来聚合和最终确定结果吗?
在会话窗口的情况下,这是如何工作的?如果单独处理事件,并且在反序列化时将代理时间戳用作单个事件的会话id,那么将为每个事件创建会话窗口吗?如果是,那么我们是否需要区别对待请求和响应事件,因为如果不这样做,那么响应事件将获得自己的会话窗口?
我将尝试张贴我的例子(在java中),我正在玩在短时间内,但任何输入以上几点将是有益的!

过程函数

dto的:

public class IncomingEvent{
    private String id;
    private String eventId;
    private Date timestamp;
    private String component;
    //getters and setters
}
public class FinalOutPutEvent{
    private String id;
    private long timeTaken;
    //getters and setters
}

==========================================================传入事件的反序列化:
公共类incomingeventdeserializationscheme实现kafkadeserializationschema{

private ObjectMapper mapper;

public IncomingEventDeserializationScheme(ObjectMapper mapper) {
    this.mapper = mapper;
}

@Override
public TypeInformation<IncomingEvent> getProducedType() {
    return TypeInformation.of(IncomingEvent.class);
}

@Override
public boolean isEndOfStream(IncomingEvent nextElement) {
    return false;
}

@Override
public IncomingEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    if (record.value() == null) {
        return null;
    }
    try {
        IncomingEvent event = mapper.readValue(record.value(), IncomingEvent.class);
        if(event != null) {
            new SessionWindow(record.timestamp());
            event.setOffset(record.offset());
            event.setTopic(record.topic());
            event.setPartition(record.partition());
            event.setBrokerTimestamp(record.timestamp());
        }
        return event;
    } catch (Exception e) {
        return null;
    }
}

}

主要逻辑

public class MyEventJob {

private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MyEventJob eventJob = new MyEventJob();

    InputStream inStream = eventJob.getFileFromResources("myConfig.properties");
    ParameterTool parameter = ParameterTool.fromPropertiesFile(inStream);
    Properties properties = parameter.getProperties();
    Integer timePeriodBetweenEvents = 120;
    String outWardTopicHostedOnServer = localhost:9092";
    DataStreamSource<IncomingEvent> stream = env.addSource(new FlinkKafkaConsumer<>("my-input-topic", new IncomingEventDeserializationScheme(mapper), properties));
    SingleOutputStreamOperator<IncomingEvent> filteredStream = stream
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<IncomingEvent>() {
            long eventTime;
            @Override
            public long extractTimestamp(IncomingEvent element, long previousElementTimestamp) {
                return element.getTimestamp();
            }
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(eventTime); 
            }
        })
        .map(e -> { e.setId(e.getEventId()); return e; });
    SingleOutputStreamOperator<FinalOutPutEvent> correlatedStream = filteredStream
        .keyBy(new KeySelector<IncomingEvent, String> (){
            @Override
            public String getKey(@Nonnull IncomingEvent input) throws Exception {
                return input.getId();
            }
        })
        .window(GlobalWindows.create()).allowedLateness(Time.seconds(defaultSliceTimePeriod))
        .trigger( new Trigger<IncomingEvent, Window> (){
            private final long sessionTimeOut;
            public SessionTrigger(long sessionTimeOut) {
                this.sessionTimeOut = sessionTimeOut;
            }
            @Override
            public TriggerResult onElement(IncomingEvent element, long timestamp, Window window, TriggerContext ctx)
                    throws Exception {
                ctx.registerProcessingTimeTimer(timestamp + sessionTimeOut); 
                return TriggerResult.CONTINUE;
            }
            @Override
            public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
                return TriggerResult.FIRE_AND_PURGE;
            }
            @Override
            public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
                    return TriggerResult.CONTINUE;
            }
            @Override
            public void clear(Window window, TriggerContext ctx) throws Exception {
                //check the clear method implementation
            }
        })
        .process(new ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>() {
        @Override
        public void process(String arg0,
                ProcessWindowFunction<IncomingEvent, FinalOutPutEvent, String, SessionWindow>.Context arg1,
                Iterable<IncomingEvent> input, Collector<FinalOutPutEvent> out) throws Exception {
            List<IncomingEvent> eventsIn = new ArrayList<>();
            input.forEach(eventsIn::add);
            if(eventsIn.size() == 1) {
                //Logic to handle incomplete request/response events
            } else if (eventsIn.size() == 2) {
                //Logic to handle the complete request/response and how much time it took
            }
        }
    } );
        FlinkKafkaProducer<FinalOutPutEvent> kafkaProducer = new FlinkKafkaProducer<>(
                outWardTopicHostedOnServer,            // broker list
                "target-topic",            // target topic
                new EventSerializationScheme(mapper));
    correlatedStream.addSink(kafkaProducer);
    env.execute("Streaming");
}

}
谢谢vicky

gfttwv5a

gfttwv5a1#

根据您的描述,我认为您需要编写一个定制的processfunction,它由 session_id . 你会有一个 ValueState ,其中存储请求事件的时间戳。当您得到相应的响应事件时,您计算delta并发出它(使用 session_id )清除状态。
很可能您还希望在获取请求事件时设置一个计时器,这样,如果在安全/长时间内没有获取响应事件,您就可以发出失败请求的附带输出。

kiz8lqtg

kiz8lqtg2#

因此,使用默认触发器,每个窗口都会在时间完全过去后完成。取决于您是否正在使用 EventTime 或者 ProcessingTime 这可能意味着不同的事情,但一般来说,flink总是等待窗口关闭,然后才完全处理它。在你的情况下,t+31的事件只会转到另一个窗口。
至于会话窗口,它们也是窗口,这意味着最终它们只是聚合时间戳之间的差异小于所定义的间隙的样本。在内部,这比普通窗口更复杂,因为它们没有定义开始和结束。sessionwindow操作符获取sample并为每个单独的sample创建一个新窗口。然后,操作员验证新创建的窗口是否可以与现有窗口合并(即,它们的时间戳是否比间隙更近),并合并它们。这最终会产生一个窗口,其中所有元素的时间戳都比定义的间隔更接近。

jmp7cifd

jmp7cifd3#

你把事情弄得更复杂了。下面的例子将需要一些调整,但希望能传达如何使用 KeyedProcessFunction 而不是会话窗口。
另外,构造函数 BoundedOutOfOrdernessTimestampExtractor 希望通过 Time maxOutOfOrderness . 不知道为什么要重写它 getCurrentWatermark 方法的实现忽略 maxOutOfOrderness .

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Event> events = ...

    events
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks(OUT_OF_ORDERNESS))
        .keyBy(e -> e.sessionId)
        .process(new RequestReponse())
        ...
}

public static class RequestReponse extends KeyedProcessFunction<KEY, Event, Long> {
    private ValueState<Long> requestTimeState;

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Event> descriptor = new ValueStateDescriptor<>(
                "request time", Long.class);
        requestState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Event event, Context context, Collector<Long> out) throws Exception {
        TimerService timerService = context.timerService();

        Long requestedAt = requestTimeState.value();
        if (requestedAt == null) {
            // haven't seen the request before; save its timestamp
            requestTimeState.update(event.timestamp);
            timerService.registerEventTimeTimer(event.timestamp + TIMEOUT);
        } else {
            // this event is the response
            // emit the time elapsed between request and response
            out.collect(event.timestamp - requestedAt);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws Exception {
         //handle incomplete request/response events
    }
}

public static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
    public TimestampsAndWatermarks(Time t) {
        super(t);
    }

    @Override
    public long extractTimestamp(Event event) {
        return event.eventTime;
    }
}

相关问题