假设我有两个 DataStream
不同类型的:
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
如何将两个流写入一个文件?
我试过不同的方法,但似乎不管用。举个例子,我不能直接写成
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
因为Flink会抱怨以下信息:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
另一方面,我不能这样覆盖:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
因为(据我所知)第二个流会覆盖第一个流写的任何东西。
最后,我想把这条小溪连接起来
val connectedStream: ConnectedStream = stream1.connect(stream2)
但我会得到一个 ConnectedStream
,它没有 writeAsText
方法。
(作为记录,我实际上有4个流要写入一个文件)。
1条答案
按热度按时间dfty9e191#
一个非常简单的解决方案是为每个流使用一个Map器将每个事件Map到
String
(或其他常见类型,如byte[]
). 然后有四个相同类型的流(DataStream[String]
)它可以合并成一个流,并作为一个流写入一个文件。如下所示: