ApacheFlink在特定密钥上连接不同的数据流

rlcwz9us  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(430)

我有两个 DataStreams ,第一个叫 DataStream<String> source 从消息代理接收记录,第二个是 SingleOutputOperator<Event> events ,这是将源Map到 Event.class .
我有一个用例需要使用 SingleOutputOperator<Event> events 以及其他使用 DataStream<String> source . 在使用 DataStream<String> source ,我需要加入 SingleOutputOperator<String> result 之后应用一些过滤器,以避免Map source 再次进入 Event.class 因为我已经做了手术 Stream ,我需要在 SingleOutputOperator<String> result 进入 SingleOutputOperator<Event> events 应用另一个Map导出 SingleOutputOperator<EventOutDto> out .
这是一个例子:

DataStream<String> source = env.readFrom(source);
SingleOutputOperator<Event> events = source.map(s -> mapper.readValue(s, Event.class));

public void filterAndJoin(DataStream<String> source, SingleOutputOperator<Event> events){

  SingleOutputOperator<String> filtered = source.filter(s -> new FilterFunction());

  SingleOutputOperator<EventOutDto> result = (this will be the result of search each record 
      based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}

我有这个密码:

filtered.join(events)
                    .where(k -> {
                        JsonNode tree = mapper.readTree(k);
                        String id = "";
                        if (tree.get("Id") != null) {
                            id = tree.get("Id").asText();
                        }
                        return id;
                    })
                    .equalTo(e -> {
                        return e.Id;
                    })
                    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                    .apply(new JoinFunction<String, Event, BehSingleEventTriggerDTO>() {
                        @Override
                        public EventOutDto join(String s, Event event) throws Exception {
                            return new EventOutDto(event);
                        }
                    })
                    .addSink(new SinkFunction());

在上面的代码中,所有的工作都很好 ids 都是一样的,所以基本上 where(id).equalTo(id) 应该有用,但这个过程永远达不到目标 apply 功能。
观察: Watermark 分配了相同的时间戳
问题:
知道为什么吗?
我解释得好吗?

axzmvihb

axzmvihb1#

我这样做解决了连接问题:

SingleOutputStreamOperator<ObjectDTO> triggers = candidates
                    .keyBy(new KeySelector())
                    .intervalJoin(keyedStream.keyBy(e -> e.Id))
                    .between(Time.milliseconds(-2), Time.milliseconds(1))
                    .process(new new ProcessFunctionOne())
                    .keyBy(k -> k.otherId)
                    .process(new ProcessFunctionTwo());

相关问题