如何将flink程序的每个滑动窗口的结果写入新文件,而不是将所有窗口的结果附加到一个文件中

lnlaulya  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(423)

下面是一个flink程序(java),它从一个文件中读取tweets,提取hash标签,计算每个hash标签的重复次数,最后写入一个文件。
现在在这个程序中有一个大小为20秒的滑动窗口,可以滑动5秒。在sink中,所有输出数据都被写入名为outfile的文件中。意味着每5秒就有一个窗口被触发,并将数据写入outfile。
我的问题是:
我希望每启动一个窗口(意味着每5秒钟)数据都写入新文件(而不是附加在同一个文件中)。请指导在哪里和如何可以做到这一点?我需要使用自定义触发器或任何有关接收器的配置吗?或者别的什么?
代码:

<!-- language: lang-java -->

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.getConfig().setAutoWatermarkInterval(100);

env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

String path = "C:\\Users\\eventTime";
// Reading data from files of folder eventTime.
DataStream<String> streamSource = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).uid("read-1");

//Extracting the hash tags of tweets
DataStream<Tuple3<String, Integer, Long>> mapStream = streamSource.map(new ExtractHashTagFunction());   

//generating watermarks and extracting the timestamps from tweets
DataStream<Tuple3<String, Integer, Long>> withTimestampsAndWatermarks = mapStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

KeyedStream<Tuple3<String, Integer, Long>,Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);

//Using sliding window of 20 seconds which slide by 5 seconds.
SingleOutputStreamOperator<Tuple4<String, Integer, Long, String>> aggregatedStream = keyedStream.**window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5)))**
        .aggregate(new AggregateHashTagCountFunction()).uid("agg-123");                 

aggregatedStream.writeAsText("C:\\Users\\outfile", WriteMode.NO_OVERWRITE).setParallelism(1).uid("write-1");

env.execute("twitter-analytics");
2ledvvac

2ledvvac1#

如果对内置接收器不满意,可以定义自定义接收器: stream.addSink(new MyCustomSink ...) 这个 MyCustomSink 应该实施 SinkFunction 您的自定义接收器将包含一个文件编写器和一个计数器。每次调用接收器时,它都会向 "/path/to/file + counter.yourFileExtension" https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/sinkfunction.html

相关问题