通过匹配模式持久化“身份”

yrefmtwq  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(290)

我试着用flink-cep来衡量一个市场中的竞价从获得 BidState.OPENBidState.Closed . 我收到了一个带有id和状态的投标数据流,从目前的情况来看,我将所有“打开”的投标与所有“关闭”的投标进行匹配。
我有一个条件 patternStream.process 它只允许具有相同id的开标和收盘报价配对,因为它们应该是配对的。这感觉是不对的,因为匹配的数量以这种方式快速增长,我有一种感觉,这可以通过模式来完成。那么,有没有办法确保“start”和“end”对象具有相同的id?

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) {
                return value.getState() == BidState.OPENED;
            }
        }).followedByAny("end").where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) throws Exception {
                return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
            }
        }).within(timeout);

PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);

patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
    @Override
    public void processMatch(Map<String
            , List<BidEvent>> map
            , Context context
            , Collector<MatchingDuration> collector) {

        BidEvent start = map.get("start").get(0);
        BidEvent end = map.get("end").get(0);
        if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
            collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
        }
    }
}).addSink(matchingDurationSinkFunction);
4bbkushb

4bbkushb1#

我想出了如何得到我想要的行为: BidEventDataStream 必须设置关键帧才能在具有相同关键帧的对象上进行模式匹配。但是,不需要对问题中的代码进行任何更改 BidEventDataStream 必须编辑才能捕获 BidEvent.getBidId() :

BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
                    @Override
                    public Long getKey(BidEventvalue) {
                        return value.getBidId();
                    }
                })

相关问题