如何将多个数据流写入单个文件
How to write multiple DataStream's to a single file
假设我有两个 DataStream
的不同类型:
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
如何将两个流写入单个文件?
我尝试了不同的方法,但似乎都不起作用。例如,我不能直接写成
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
因为 Flink 会报错如下信息:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
另一方面,我不能这样覆盖:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
因为(据我所知)第二个流将覆盖第一个流写入的内容。
最后想到了这样接流
val connectedStream: ConnectedStream = stream1.connect(stream2)
但我会得到一个 ConnectedStream
,它没有 writeAsText
方法。
(郑重声明,我实际上有 4 个流要写入单个文件)。
一个非常简单的解决方案是为每个流使用一个映射器,将每个事件映射到 String
(或另一种常见类型,例如 byte[]
)。然后你有四个具有相同类型 (DataStream[String]
) 的流,你可以将它们联合成一个流并作为一个流写入文件。
这看起来如下:
val s1: DataStream[String] = ???
val s2: DataStream[String] = ???
val s3: DataStream[String] = ???
val s4: DataStream[String] = ???
val out: DataStream[String] = s1.union(s2).union(s3).union(s4)
out.writeAsText("path/to/file")
假设我有两个 DataStream
的不同类型:
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
如何将两个流写入单个文件?
我尝试了不同的方法,但似乎都不起作用。例如,我不能直接写成
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
因为 Flink 会报错如下信息:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
另一方面,我不能这样覆盖:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
因为(据我所知)第二个流将覆盖第一个流写入的内容。
最后想到了这样接流
val connectedStream: ConnectedStream = stream1.connect(stream2)
但我会得到一个 ConnectedStream
,它没有 writeAsText
方法。
(郑重声明,我实际上有 4 个流要写入单个文件)。
一个非常简单的解决方案是为每个流使用一个映射器,将每个事件映射到 String
(或另一种常见类型,例如 byte[]
)。然后你有四个具有相同类型 (DataStream[String]
) 的流,你可以将它们联合成一个流并作为一个流写入文件。
这看起来如下:
val s1: DataStream[String] = ???
val s2: DataStream[String] = ???
val s3: DataStream[String] = ???
val s4: DataStream[String] = ???
val out: DataStream[String] = s1.union(s2).union(s3).union(s4)
out.writeAsText("path/to/file")