如何正确处理自定义mapfunction中的错误?

bkkx9g8r  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(291)

我已经实施了 MapFunction 为了我的Apache·Flink流。它正在解析传入元素并将其转换为其他格式,但有时可能会出现错误(即传入数据无效)。
我看到了两种可能的处理方法:
忽略无效元素,但似乎我不能忽略错误,因为对于任何传入元素,我必须提供传出元素。
将传入元素拆分为有效和无效,但似乎我应该使用其他函数。
所以,我有两个问题:
如何正确处理我的工作中的错误 MapFunction ?
如何正确实现这种转换功能?

weylhg0b

weylhg0b1#

你需要一个 FlatMapFunction 而不是 MapFunction . 这将允许您仅发射有效的元素。下面显示了一个示例实现:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});
rdlzhqv9

rdlzhqv92#

这是建立在@till rohrmann上面的想法之上的。将此添加为答案而不是注解以获得更好的格式。
我认为实现“split+select”的一种方法是使用带有sideoutput的processfunction。我的图表如下所示:

Source --> ValidateProcessFunction ---good data--> UDF--->SinkToOutput
                                    \
                                     \---bad data----->SinkToErrorChannel

这样行吗?有更好的办法吗?

相关问题