flink 1.10.1的行为与parallelism max 1不同

csbfibhn  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(378)

首先,我已经在这里发现了这个问题:flink程序在并行性中的行为不同,它看起来和我现在面临的问题相同,但我认为在我的场景中确实需要cep,因为我每小时有超过1000.000条属于不同用户密钥的记录需要分析。
所以当我用parallelism 1运行cep时,一切都很好,即使是对于不同的用户密钥,但是有点慢,因为flink需要在一个线程中逐个用户地分析,而且这个操作需要足够快,以便识别某些模式,然后在不到1分钟的时间内发送通知,例如,这就是为什么我需要一个以上的并行线程。
在我的例子中,我使用richflatmapfunction保持前一个模式来识别下一个模式,然后发送通知,下面是我的代码:

final DataStream<EventPush> eventsStream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQPushConsumer())
                .keyBy(k -> k.id);

private static SingleOutputStreamOperator<String> getPushToSend(KeyedStream<EventPush, String> stream) {
        return stream.flatMap(new WebPushFlatMapFunction())
                .map(x -> new ObjectMapper().writeValueAsString(x));
    }

/*the code below belongs to WebPushFlatMapFunction class, which is the RichFlatMapFunction using ValueState*/

 private boolean inTime(long start, long end) {
        final long difference = (end > start) ? (end - start) : (start - end);
        long time_frame = 120000L;
        return difference > 0 && time_frame >= difference;
    }

    @Override
    public void flatMap(EventPush value, Collector<EventPush> out) {
        final String pageName= value.pageName.trim();
        Tuple4<String, String, Long, Timestamp> prev;
        try {
            prev = previous.value();
            if (b_pageName.equalsIgnoreCase(pattern)) {
                LOG.info("umid " + value.idsUmid + " match (" + pattern + ") at: " + value.timestamp);
                previous.update(new Tuple4<>(value.idsUmid, pageName, value.timestamp.getTime(), value.timestamp));
            }
            if (prev != null) {
                if (inTime(value.timestamp.getTime(), prev.f2)) {
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(full_pattern) && pageName.equals(home) && prev.f3.before(value.timestamp)) {
                        if (PropertyFileReader.isWebPushLoggerActivated())
                            LOG.info("umid " + value.idsUmid + " match (" + home + ")" + "triggered at: " + value.timestamp);
                        prev.f1 = "";
                        out.collect(value);
                    }
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(pattern) && pageName.equals(full_pattern) && prev.f3.before(value.timestamp)) {
                        LOG.info("umid " + value.idsUmid + " match (" + full_pattern + ") at: " + value.timestamp);
                        prev.f3 = value.timestamp;
                        prev.f1 = pageName;
                        previous.update(prev);
                    }
                }
            }
        } catch (IOException e) {
            CatchHandler.generalCatchHandler(e);
        }
    }

用平行度1,我得到正确的顺序:1,2,3。我可以在一个线程中接收1个,在另一个线程中接收3个,因为所有的状态都属于同一个用户密钥,这3个状态将在不同的线程中进行分区。我的问题是:有没有任何方法可以用更多的并行性来做到这一点?谨致问候。

qrjkbowd

qrjkbowd1#

我发现了一个问题,在后端,它将同一个userid分配给不同的用户(不知道是如何发生的,也不是flink),这就是为什么模式永远不匹配的原因,因为具有相同userid的不同用户以不同的顺序发送事件被处理在同一个子任务中,例如,它们在最后得到merge,并且user1的event1可以出现在user2的event2之后。谨致问候

uemypmqf

uemypmqf2#

听起来您希望将每个用户的所有分析放在一起,但同时对不同的用户执行分析。实现这一点的方法是通过userid对流进行键控。这意味着对于单个用户,他们的事件由单个(非并行)管道处理。
如果速度太慢,你可以做一些事情来加快速度。通常最有帮助的事情包括:更高效的序列化、执行预聚合或增量聚合、删除keyby或重新平衡,以及启用对象重用。

相关问题