我有两个 DataStreams
,第一个叫 DataStream<String> source
从消息代理接收记录,第二个是 SingleOutputOperator<Event> events
,这是将源Map到 Event.class
.
我有一个用例需要使用 SingleOutputOperator<Event> events
以及其他使用 DataStream<String> source
. 在使用 DataStream<String> source
,我需要加入 SingleOutputOperator<String> result
之后应用一些过滤器,以避免Map source
再次进入 Event.class
因为我已经做了手术 Stream
,我需要在 SingleOutputOperator<String> result
进入 SingleOutputOperator<Event> events
应用另一个Map导出 SingleOutputOperator<EventOutDto> out
.
这是一个例子:
DataStream<String> source = env.readFrom(source);
SingleOutputOperator<Event> events = source.map(s -> mapper.readValue(s, Event.class));
public void filterAndJoin(DataStream<String> source, SingleOutputOperator<Event> events){
SingleOutputOperator<String> filtered = source.filter(s -> new FilterFunction());
SingleOutputOperator<EventOutDto> result = (this will be the result of search each record
based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}
我有这个密码:
filtered.join(events)
.where(k -> {
JsonNode tree = mapper.readTree(k);
String id = "";
if (tree.get("Id") != null) {
id = tree.get("Id").asText();
}
return id;
})
.equalTo(e -> {
return e.Id;
})
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<String, Event, BehSingleEventTriggerDTO>() {
@Override
public EventOutDto join(String s, Event event) throws Exception {
return new EventOutDto(event);
}
})
.addSink(new SinkFunction());
在上面的代码中,所有的工作都很好 ids
都是一样的,所以基本上 where(id).equalTo(id)
应该有用,但这个过程永远达不到目标 apply
功能。
观察: Watermark
分配了相同的时间戳
问题:
知道为什么吗?
我解释得好吗?
1条答案
按热度按时间axzmvihb1#
我这样做解决了连接问题: