scala—在ApacheFlink中对流进行排序并识别用户会话

ztmd8pv5  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(298)

我有两个事件流
l=(l1,l3,l8,…)-是稀疏的,表示用户登录到ip
e=(e2,e4,e5,e9,…)—是特定ip的日志流
较低的索引表示时间戳。。。如果我们把这两条流连接在一起,按时间排序,我们会得到:
l1、e2、l3、e4、e5、l8、e9。。。
有没有可能实现定制 Window / Trigger 将事件分组到会话的函数(不同用户登录之间的时间):
l1-l3:e2
l3-l8:e4、e5
l8-l14:e9、e10、e11、e12、e13
...
我看到的问题是,这两个流不一定是排序的。我考虑过按时间戳对输入流进行排序。这样就可以很容易地使用 GlobalWindow 和习俗 Trigger -然而,这似乎是不可能的。
在当前的flink(v1.3.2)中,我是否遗漏了一些东西或是绝对不可能这样做?
谢谢

ukdjmx9f

ukdjmx9f1#

问:e3不应该先于l4吗?
使用 ProcessFunction . 像这样:

public static class SortFunction extends ProcessFunction<Event, Event> {
  private ValueState<PriorityQueue<Event>> queueState = null;

  @Override
  public void open(Configuration config) {
    ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
        // state name
        "sorted-events",
        // type information of state
        TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
        }));
    queueState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
      PriorityQueue<Event> queue = queueState.value();
      if (queue == null) {
        queue = new PriorityQueue<>(10);
      }
      queue.add(event);
      queueState.update(queue);
      timerService.registerEventTimeTimer(event.timestamp);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event head = queue.peek();
    while (head != null && head.timestamp <= watermark) {
      out.collect(head);
      queue.remove(head);
      head = queue.peek();
    }
  }
}

更新:请参阅how to sort an out-order event time stream using flink,以了解通常更好的方法的描述。

相关问题