apache flink(集群中stdout出错)

cgyqldqp  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(368)

Apache Flink ,我无法在中看到输出 std out ,但我的作业正在成功运行,数据即将到来

xwbd5t1u

xwbd5t1u1#

public static void main(String[] args) throws Exception {

    // the host and the port to connect to
    final String hostname = "192.168.1.73";
    final int port = 9000;

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.1.68", 6123);

    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream(hostname, port, "\n");

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text

            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })

            .keyBy("word").timeWindow(Time.seconds(5))

            .reduce(new ReduceFunction<WordWithCount>() {
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
}

public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {
    }

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}
omqzjyyz

omqzjyyz2#

在集群上运行作业时,数据流将打印到taskmanager进程的标准输出。此taskmanager标准输出被定向到flink根目录的./log/目录中的.out文件。我相信你已经看到了你的成果。
我不知道是否有可能改变taskmanagers的stdout,但是,一个快速而肮脏的解决方案可能是将输出写入套接字:

output.writeToSocket(outputHost, outputPort, new SimpleStringSchema())

相关问题