文章23 | 阅读 14965 | 点赞0
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
SingleOutputStreamOperator<Tuple2> mapped = streamSource.map(new MapFunction<Integer, Tuple2>() {
@Override
public Tuple2 map(Integer item) throws Exception {
return Tuple2.of(item, 1);
}
}).returns(Types.TUPLE(Types.INT, Types.INT));
KeyedStream<Tuple2, Tuple> stream = mapped.keyBy(0);
stream.print();
env.execute("WordCountStreamingJob");
}
}
Bean
名称为WordCount
package com.gosuncn;
public class WordCount {
public Integer word;
public Integer count;
public WordCount() {
}
public WordCount(Integer word, Integer count) {
this.word = word;
this.count = count;
}
public static WordCount of(Integer word, Integer count) {
return new WordCount(word, count);
}
@Override
public String toString() {
return "WordCount{" +
"word=" + word +
", count=" + count +
'}';
}
}
WordCount
中的word
字段进行分组,不能使用脚标0 1 2 3 ...
了。public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
SingleOutputStreamOperator<WordCount> mapped = streamSource.map(new MapFunction<Integer, WordCount>() {
@Override
public WordCount map(Integer item) throws Exception {
return WordCount.of(item, 1);
}
}).returns(WordCount.class);
KeyedStream<WordCount, Tuple> stream = mapped.keyBy("word");
stream.print();
env.execute("WordCountStreamingJob");
}
}
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.fromCollection(Arrays.asList("江苏 南京", "江苏 徐州", "湖南 长沙", "广东 广州", "辽宁 沈阳", "广东 广州", "湖南 长沙", "江苏 南京", "辽宁 沈阳", "湖南 张家界"));
SingleOutputStreamOperator<Tuple3> mapped = streamSource.map(new MapFunction<String, Tuple3>() {
@Override
public Tuple3 map(String item) throws Exception {
return Tuple3.of(item.split(" ")[0], item.split(" ")[1], 1);
}
}).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT));
KeyedStream<Tuple3/*分组后的类型*/, Tuple/*分组的key的类型*/> keyed = mapped.keyBy(0, 1);
SingleOutputStreamOperator<Tuple3> summed = keyed.sum(2);
summed.print();
env.execute("WordCountStreamingJob");
}
}
同理,如果自定义Bean的话,不能用脚标了,要用Bean的字段名称。
内容来源于网络,如有侵权,请联系作者删除!