如何获得apache flink中filter函数中不匹配的值的输出

xdnvmnnf  于 2021-06-26  发布在  Flink
关注(0)|答案(2)|浏览(636)

我是apache flink的新手,我正在尝试过滤以字母“n”开头的单词,我正在获得输出,但是我如何才能获得下面不以单词“n”开头的单词,这是我正在使用的代码

package DataStream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordStream {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> inputData = env.socketTextStream("localhost", 9999);

        DataStream<String> filterData = inputData.filter(new FilterFunction<String>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("N");
            }
        });

        DataStream<Tuple2<String, Integer>> tokenize = filterData
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        out.collect(new Tuple2<String, Integer>(value, Integer.valueOf(1)));

                    }
                });

        DataStream<Tuple2<String, Integer>> counts = tokenize.keyBy(0).sum(1);

        counts.print();

        env.execute("WordStream");

    }

}

你能建议如何将不匹配的单词捕获到另一个流中吗。

4dbbbstv

4dbbbstv1#

更简单的解决方案:

DataStream<String> nwords = input.filter(s -> startsWith("N"));
DataStream<String> others = input.filter(s -> !startsWith("N"));

我相信这比使用侧输出的解决方案效率略低,但它仍然可以在单个任务中运行,使用操作符链接,因此它也不需要ser/de开销,也不需要网络。
别误会——一般来说,边输出是分割流的方法。

jexiocij

jexiocij2#

我认为您可以利用侧输出来实现这一点。只需使用processfunction发出实际收集器中的匹配元素和带有side output标记的不匹配元素,然后从主流中获取side output元素。
例如,你的代码可以这样修改,

package datastream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class WordStream {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> inputData = env.socketTextStream("localhost", 9999);

        // Initialize side-output tag to collect the un-matched elements 
        OutputTag<Tuple2<String, Integer>> unMatchedSideOutput = new OutputTag<Tuple2<String, Integer>>("unmatched-side-output") {};

        SingleOutputStreamOperator<Tuple2<String, Integer>> tokenize = inputData
                .process(new ProcessFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                        if (value.startsWith("N")) {
                            // Emit the data to actual collector
                            out.collect(new Tuple2<>("Matched=" + value, Integer.valueOf(1)));
                        } else {
                            // Emit the un-matched data to side output
                            ctx.output(unMatchedSideOutput, new Tuple2<>("UnMatched=" + value, Integer.valueOf(1)));
                        }
                    }
                });

        DataStream<Tuple2<String, Integer>> count = tokenize.keyBy(0).sum(1);

        // Fetch the un-matched element using side-output tag and process it
        DataStream<Tuple2<String, Integer>> unMatchedCount = tokenize.getSideOutput(unMatchedSideOutput).keyBy(0).sum(1);

        count.print();

        unMatchedCount.print();

        env.execute("WordStream");

    }
}

注意:我用前缀稍微更改了发射的值 Matched= 以及 UnMatched= 在输出中得到清晰的理解。
对于以下输入,

Hello
Nevermind
Hello

我得到以下输出,

3> (UnMatched=Hello,1)
4> (Matched=Nevermind,1)
3> (UnMatched=Hello,2)

相关问题