houcros houcros - 3 months ago 6
Scala Question

How to write multiple DataStream's to a single file

Let's say I have two

DataStream
's of different types:

val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...


How can I write both streams into a single file?

I've tried different things, but don't seem to work. For instance, I can't just write straight away as

stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)


because Flink will complain with the following message:

java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.


On the other hand, I can't overwrite like this:

stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)


because (as far as I understand) the second stream will overwrite whatever the first stream wrote.

Finally, I thought about connecting the stream like this

val connectedStream: ConnectedStream = stream1.connect(stream2)


but then I'd get a
ConnectedStream
, which doesn't have a
writeAsText
method.

(For the record, I actually have 4 streams that I'd like to write to a single file).

Answer

A very simple solution is to use for each stream a mapper to map each event into a String (or another common type such as byte[]). Then you have four streams with the same type (DataStream[String]) which you can union into a single stream and write as one stream into a file.

This would look as follows:

val s1: DataStream[String] = ???
val s2: DataStream[String] = ???
val s3: DataStream[String] = ???
val s4: DataStream[String] = ???

val out: DataStream[String] = s1.union(s2).union(s3).union(s4)
out.writeAsText("path/to/file")
Comments