Ankur Chauhan Ankur Chauhan - 1 month ago 9
Java Question

Launching dataflow jobs from a java application

I am writing a application that attempts to launch a batch dataflow pipeline based on parameters provides. For this i use

PipelineOptionsFactory.create().as(...)
followed by setters to configure options.

But when I create a pipeline object object using
Pipeline.create(opts)
I get the following error:

04:48:08.711 [pool-4-thread-9] ERROR c.g.c.d.s.r.DataflowPipelineRunner - Unable to convert url (jar:file:/ferric.jar!/) to file.
04:48:08.711 [pool-4-thread-9] WARN BatchJobManager - unable to start materialization for view
java.lang.RuntimeException: Failed to construct instance from factory method DataflowPipelineRunner#fromOptions(interface com.google.cloud.dataflow.sdk.options.PipelineOptions)
at com.google.cloud.dataflow.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:234)
at com.google.cloud.dataflow.sdk.util.InstanceBuilder.build(InstanceBuilder.java:163)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.fromOptions(PipelineRunner.java:58)
at com.google.cloud.dataflow.sdk.Pipeline.create(Pipeline.java:135)
at com.brightcove.rna.dataflow.MaterializationPipeline.<init>(MaterializationPipeline.java:45)
at com.brightcove.rna.dataflow.MaterializationPipeline.create(MaterializationPipeline.java:92)
at com.brightcove.rna.ferric.DataflowJobService.createPipeline(DataflowJobService.java:121)
at javaslang.control.Try.mapTry(Try.java:410)
at javaslang.control.Try.map(Try.java:380)
at com.brightcove.rna.ferric.DataflowJobService.create(DataflowJobService.java:102)
at com.brightcove.rna.ferric.BatchJobScheduler.lambda$null$13(BatchJobScheduler.java:94)
at javaslang.Value.forEach(Value.java:246)
at com.brightcove.rna.ferric.BatchJobScheduler.lambda$startMaterializationJobs$14(BatchJobScheduler.java:91)
at javaslang.control.Try.onSuccess(Try.java:442)
at com.brightcove.rna.ferric.BatchJobScheduler.startMaterializationJobs(BatchJobScheduler.java:90)
at com.brightcove.rna.ferric.BatchJobScheduler.run(BatchJobScheduler.java:52)
at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.google.cloud.dataflow.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:223)
... 27 common frames omitted
Caused by: java.lang.IllegalArgumentException: Unable to convert url (jar:file:/ferric.jar!/) to file.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.detectClassPathResourcesToStage(DataflowPipelineRunner.java:3176)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.fromOptions(DataflowPipelineRunner.java:291)
... 32 common frames omitted
Caused by: java.lang.IllegalArgumentException: URI is not hierarchical
at java.io.File.<init>(File.java:418)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.detectClassPathResourcesToStage(DataflowPipelineRunner.java:3172)
... 33 common frames omitted


It seems like the pipeline runner is attempting to determine the path of the jar that contains the classes that it needs to upload. There is only one jar (uberjar) which has all the required classes. And clearly, the path being considered is not correct.

What are the possible workarounds that I can use to launch dataflow jobs programmatically.

Answer

The classpath detection and uploading logic is limited to files and doesn't support scenarios where jars are embedded within other jars. Some ways that you could resolve this is:

  1. Flatten the multi-jar jar into one jar containing all the extracted jars. This is good since you keep your one jar property and don't need to write any code to change the pipeline but if you build this regularly, will make your build more complicated. I would look into the Maven shade plugin and bundling to do this for you or an equivalent technology dependent on your build system.
  2. Use a more traditional setup where you specify each individual jar separately. You can use the Maven exec plugin to help with scenarios to build and launch your application.
  3. Extract out all the jars during runtime and set the filesToStage property within PipelineOptions with all the resources you want to have staged.
  4. Add support for embedded jar scenarios to Apache Beam/Dataflow. I filed this tracking issue if you want to take a look at performing this contribution.

There is this very related SO question as well where a user uses their IDE to generate an uberjar for execution and had hit a similar scenario to what your asking about.