下面是一个flink程序(java),它从一个文件中读取tweets,提取hash标签,计算每个hash标签的重复次数,最后写入一个文件。
现在在这个程序中有一个大小为20秒的滑动窗口,可以滑动5秒。在sink中,所有输出数据都被写入名为outfile的文件中。意味着每5秒就有一个窗口被触发,并将数据写入outfile。
我的问题是:
我希望每启动一个窗口(意味着每5秒钟)数据都写入新文件(而不是附加在同一个文件中)。请指导在哪里和如何可以做到这一点?我需要使用自定义触发器或任何有关接收器的配置吗?或者别的什么?
代码:
<!-- language: lang-java -->
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
String path = "C:\\Users\\eventTime";
// Reading data from files of folder eventTime.
DataStream<String> streamSource = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).uid("read-1");
//Extracting the hash tags of tweets
DataStream<Tuple3<String, Integer, Long>> mapStream = streamSource.map(new ExtractHashTagFunction());
//generating watermarks and extracting the timestamps from tweets
DataStream<Tuple3<String, Integer, Long>> withTimestampsAndWatermarks = mapStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
KeyedStream<Tuple3<String, Integer, Long>,Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);
//Using sliding window of 20 seconds which slide by 5 seconds.
SingleOutputStreamOperator<Tuple4<String, Integer, Long, String>> aggregatedStream = keyedStream.**window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5)))**
.aggregate(new AggregateHashTagCountFunction()).uid("agg-123");
aggregatedStream.writeAsText("C:\\Users\\outfile", WriteMode.NO_OVERWRITE).setParallelism(1).uid("write-1");
env.execute("twitter-analytics");
1条答案
按热度按时间2ledvvac1#
如果对内置接收器不满意,可以定义自定义接收器:
stream.addSink(new MyCustomSink ...)
这个MyCustomSink
应该实施SinkFunction
您的自定义接收器将包含一个文件编写器和一个计数器。每次调用接收器时,它都会向"/path/to/file + counter.yourFileExtension"
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/sinkfunction.html