我是apache flink的新手,我正在尝试过滤以字母“n”开头的单词,我正在获得输出,但是我如何才能获得下面不以单词“n”开头的单词,这是我正在使用的代码
package DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputData = env.socketTextStream("localhost", 9999);
DataStream<String> filterData = inputData.filter(new FilterFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("N");
}
});
DataStream<Tuple2<String, Integer>> tokenize = filterData
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<String, Integer>(value, Integer.valueOf(1)));
}
});
DataStream<Tuple2<String, Integer>> counts = tokenize.keyBy(0).sum(1);
counts.print();
env.execute("WordStream");
}
}
你能建议如何将不匹配的单词捕获到另一个流中吗。
2条答案
按热度按时间4dbbbstv1#
更简单的解决方案:
我相信这比使用侧输出的解决方案效率略低,但它仍然可以在单个任务中运行,使用操作符链接,因此它也不需要ser/de开销,也不需要网络。
别误会——一般来说,边输出是分割流的方法。
jexiocij2#
我认为您可以利用侧输出来实现这一点。只需使用processfunction发出实际收集器中的匹配元素和带有side output标记的不匹配元素,然后从主流中获取side output元素。
例如,你的代码可以这样修改,
注意:我用前缀稍微更改了发射的值
Matched=
以及UnMatched=
在输出中得到清晰的理解。对于以下输入,
我得到以下输出,