并行处理—是否可以在ApacheFlink中使仅Map任务并行执行

sbdsn5lh  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(296)

我正在使用flink处理一些json格式的流数据:

{"uuid":"903493290432934", "bin": "68.3"}
{"uuid":"324938722984237", "bin": "56.8"}
...

我的工作很简单:
从数据源获取流--->将数据反序列化为字符串--->将字符串转换为json对象 myJsonObj ---> double res = myJsonObj.get("bin") --->做一些繁重的计算 res .
这是我的密码:

FlinkPravegaReader<String> source = ... // init source
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// transform String to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
    .map(new MapFunction<String, MyJson>() {
        @Override
        public MyJson map(String s) throws Exception {
            MyJson myJson = JSON.parseObject(s, MyJson.class);
            return myJson;
        }
    });
// do the heavy process
DataStream<String> heavyResult = jsonStream
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            double res = myJson.get("bin");
            // do some very heavy calculation
            return myJson.get("uuid").asText() + " done.";
        }
    });
heavyResult.print();

据我所知,我还没用过 keyBy/window ,所以我想我用了 windowAll 默认情况下。我说得对吗?
如果我是对的,Flink的医生告诉我的 windowAll 无法并行运行。那是不是意味着我要一个接一个地做繁重的计算呢?我在想是否有可能并行地做这个繁重的计算。
如你所见,在我的情况下,使用 keyBy/window 说得通。那么如何使这个案例并行执行呢?有没有可能使两个作业使用相同的数据源一起运行,如下所示?

/----windowAll ---- do the heavy calculation
            /
Data Source-
            \
             \----windowAll ---- do the heavy calculation

这种设计可行吗?假设数据源生成三个元素:a和b。在这种设计中,我希望一个windowall处理一段时间,而另一个windowall处理b。

yuvru6vn

yuvru6vn1#

键控流用于在数据中创建分区,因此来自同一个键的所有流量都被发送到同一个taskmanager。
当您希望从流中聚合元素以将它们作为一个集合进行计算时,会使用一个窗口。
如果你的情况不适合上述情况下,你不使用他们。
为整个流提供并行性,只需使用

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);  //Notice you'll need 3 taskmanagers slots available.

要定义单个运算符的平行性(繁重计算),请使用:

DataStream<String> heavyResult = jsonStream
.map(new MapFunction<MyJson, String>() {
    @Override
    public String map(MyJson myJson) throws Exception {
        double res = myJson.get("bin");
        // do some very heavy calculation
        return myJson.get("uuid").asText() + " done.";
    }
}).setParallelism(3);  //Notice you'll need 3 taskmanagers slots available.

更多信息请访问https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/parallel.html

相关问题