Vion Vion - 29 days ago 12
Java Question

Apache Storm: Submit Topology programatically to remote cluster without creating jar

this is the first time I'm working with Apache Storm and I have the following problem. For my application, I have the requirement that the topology graph is different for each user that is using my application and there can also be multiple topology graphs per user.

Therefore, I had the idea to dynamically create the topology graph using the topology builder. For example, using the toplogy example from storm, this would just be:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3)
.fieldsGrouping("1", new Fields("word"))
.fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount())
.globalGrouping("1");


together with the following configuration:

Map defaultConf = Utils.readStormConfig();

Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 1);
conf.put(Config.NIMBUS_HOST, "IP to my remote cluster");
conf.put(Config.NIMBUS_THRIFT_PORT, defaultConf.get(Config.NIMBUS_THRIFT_PORT));
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, defaultConf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN));


When running this topology on a local cluster (without the nimbus configuration), then everything works fine.

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();


However, when submitting the toplogy to the remote cluster by just calling

StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());


I get the following exception:

java.lang.RuntimeException: Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.


So, this exception indicates to me that the Storm Submitter needs some kind of jar. After doing some research, I found that I have to set the following property in my code in order to submit the jar programatically.

System.setProperty("storm.jar", "path/to/jar");


So here are my questions:

What is the purpose of the jar which I have to commit? Is it just a library containing all my available bolts and spouts but I can still change the order of them dynamically in the code or must the deployed jar really contain the fixed topology? If I have to package a jar is this the jar which goes into the
System.setProperty
?

PS: I'm using IntelliJ with a Maven project.

Answer

If you submit a topology to a remote cluster, the code (ie, class files) of all used spouts/bolts must be available to all nodes in the cluster. This is the purpose of the jar file that is submitted to the cluster. It has to contain all those files. Internally, Storm's Nimbus will distribute this jar to all worker nodes to make the code available to them.

The jar only needs to contain the set of classes you want to use (in your case TestWordSpout, TestWordCounter, and TestGlobalCount -- and maybe depended classes that are used within those three if you for example use some other library. Pay attention that nested jars are not supported, ie, a jar contained in a jar does not work -- for this, you would need to extract the classes of the inner jar first and add those classes directly into the final jar).

The structure of the topology is completely independent of the jar file. And yes, this is the jar you specify via the system property. The reason why many people build a jar that contains a main together with a topology definition (that is often static but actually could be flexible, too) is that they submit the topology not via an IDE as you do, but via command line bin/storm. For this to work, an entry point class contained in the jar that has a main method that assembled the topology structure is needed and the same jar is also used for code distribution of the class files because this works quite convenient (in contrast to providing a single entry point class and an additional jar file).