任务管理器接收到的消息顺序错误

kgqe7b3p  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(277)

我正在从文件中读取消息,并在写入sink之前应用一些运算符,注意到任务管理使给定密钥的消息无序。如何解决这个问题?我做错什么了吗?请检查我的源文件格式和下面的代码。谢谢!

customer_id - timestamp - event_seq
1 t1 e1
1 t2 e2
2 t1 e1
2 t2 e2
1 t3 e3

        DataStream<String> source = createTextFileSourceFromConfig(env);

        source.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String s) throws Exception {
                return new JSONObject(s);
            }
        }).keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject event) {
                return event.get("id").toString();
            }
        }).filter(new InitialFilter())...

在上面的示例中,任务管理器接收<1 t3 e3>作为id=1的第一条消息。谢谢你的建议。

1dkrff03

1dkrff031#

如果读取并行度大于1,则无法保证下游运算符接收记录的顺序,因为输入文件可以在并行执行的源之间分割。因此,可以让文件最后一个块的源在第一个块之前发出行。

相关问题