ApacheFlink:如何计算数据流中的事件总数

h43kikqp  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(356)

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件数。我是用Map上的 joinedEventDataStream 如下图所示

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {

            @Override
            public Object map(JoinedEvent joinedEvent) throws Exception {

                number_of_joined_events += 1;

                return null;
            }
        });

问题1:这是计算流中事件数的适当方法吗?
问题2:我注意到一种天生的行为,有些人可能不相信。问题是,当我在intellijide中运行flink程序时,它为我显示了正确的值 number_of_joined_events 但是 0 如果我以 jar . 所以我得到了 number_of_joined_events 当我把程序当作 jar 而不是实际计数。为什么这种情况只发生在 jar 文件提交而不是ide?

ryhaxcpt

ryhaxcpt1#

你的方法行不通。您在通过jar文件执行程序时注意到的行为是预期的。
我不知道怎么做 number_of_joined_events 已定义,但我假设它是程序中的静态变量。在ide中运行程序时,它在单个jvm中运行。因此,所有操作符都可以访问静态变量。当您向远程进程提交jar文件时,程序将在不同的jvm(可能是多个jvm)中执行,并且客户端进程中的静态变量永远不会更新。
你可以用Flink的标准或者 ReduceFunction 总而言之 1 s来计算已处理的记录数。

相关问题