apache-flink:keyedstream上的倾斜数据分布

h5qlskok  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(362)

我在flink中有一段java代码:

env.setParallelism(6);

//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);

//Filter half of the records 
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());

//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());

//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);

//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
                .keyBy(1)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new Reducer());

问题是窗口应该能够处理parallelism=2的数据,因为在tuple3的第二个字符串中有两组键为“奇”和“偶”的不同数据。所有的东西都是用parallelism 6运行的,但不是用parallelism=1运行的窗口,我只需要它有parallelism=2,因为我的要求。
代码中使用的函数如下:

public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
        return isOdd;
    }
};

public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
        return isEven;
    }
};

public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
        return newLine;
    }
};

public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
        return newLine;
    }
};

public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
            Tuple3<String, String, Integer> line2) throws Exception {
        Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
        Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
                " " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
        return newLine;
    }
};

谢谢你的帮助!
解决方案:我已经将键的内容从“奇”和“偶”改为“odd0000”和“even1111”,现在可以正常工作了。

wz3gfoph

wz3gfoph1#

通过哈希分区将密钥分发给工作线程。这意味着键值是散列的,线程是由模工人决定的。对于两个键和两个线程,很有可能两个键都分配给同一个线程。
您可以尝试使用散列值分布在两个线程上的不同键值。

相关问题