文章23 | 阅读 14969 | 点赞0
DataStreamSource<T> 转换 SingleOutputStreamOperator<T>
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
SingleOutputStreamOperator<Integer> result = streamSource.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer item) throws Exception {
return item * 1;
}
});
result.print();
env.execute("WordCountStreamingJob");
}
}
比如,传入的是一个整型的集合,传出的是另一个整型的集合。一比一对应。
简化版:
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
SingleOutputStreamOperator<Integer> result = streamSource.map((MapFunction<Integer, Integer>) item -> item * 1);
result.print();
env.execute("WordCountStreamingJob");
}
}
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
streamSource.flatMap(new FlatMapFunction<Integer, Integer>() {
@Override
public void flatMap(Integer item, Collector<Integer> out) throws Exception {
// 复制十份
for (int i = 0; i < 10; i++) {
out.collect(item);
}
}
}).print();
env.execute("WordCountStreamingJob");
}
}
传入一个集合,把每个集合复制十份,再输出一个集合。
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
streamSource.filter((FilterFunction<Integer>) item -> item > 5).print();
env.execute("WordCountStreamingJob");
}
}
返回一个布尔值,来过滤数据。
内容来源于网络,如有侵权,请联系作者删除!