DAR DAR - 2 months ago 24
Scala Question

error writing PubSub stream to Cloud Storage using Dataflow

Using SCIO from

spotify
to write a job for
Dataflow
, following 2 examples e.g1 and e.g2 to write a
PubSub
stream to
GCS
, but get the following error for the below code

Error

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection


Code

object StreamingPubSub {
def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
_ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
}
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
dataflowUtils.waitToFinish(result.internal)
}
}


I maybe mixing up the window concept with Bounded PCollection, is there a way to achieve this or do I need to apply some transform to make this happen, anyone can be of assistance on this

Answer

I believe SCIO's saveAsTextFile underneath uses Dataflow's Write transformation, which supports bounded PCollections only. Dataflow doesn't provide a direct API to write an unbounded PCollection to Google Cloud Storage yet, although this is something we are investigating.

To persist an unbounded PCollection somewhere, consider, for example, BigQuery, Datastore, or Bigtable. In SCIO's API, you could use, for example, saveAsBigQuery.