Sai Krishna Sai Krishna - 1 month ago 11x
Java Question

Trigger a spark job whenever an event occurs

I have a spark application which should run whenever it receives a kafka message on a topic.

I won't be receiving more than 5-6 messages a day so I don't want to take spark streaming approach. Instead I tried to submit the application using

but I don't like the approach as I have to set spark and Java classpath programmatically within my code along with all the necessary spark properties like executor cores, executor memory etc.

How do I trigger the spark application to run from
but make it wait until it receives a message?

Any pointers are very helpful.


You can use shell script approach with nohup command to submit job like this...

"nohup spark-submit shell script <parameters> 2>&1 < /dev/null &"

Whenever, you get messages then you can poll that event and call this shell script. Below is the code snippet to do this... Further more have a look

- Using Runtime

     * This method is to spark submit
     * <pre> You can call spark-submit or mapreduce job on the fly like this.. by calling shell script... </pre>
     * @param commandToExecute String 
    public static Boolean executeCommand(final String commandToExecute) {
        try {
            final Runtime rt = Runtime.getRuntime();
            //"process command -- " + commandToExecute);
            final String[] arr = { "/bin/sh", "-c", commandToExecute};
            final Process proc = rt.exec(arr);
            //"process started ");
            final int exitVal = proc.waitFor();
            LOG.trace(" commandToExecute exited with code: " + exitVal);

        } catch (final Exception e) {
            LOG.error("Exception occurred while Launching process : " + e.getMessage());
            return Boolean.FALSE;
             return Boolean.TRUE;

- Using ProcessBuilder - Another way

private static void executeProcess(Operation command, String database) throws IOException,
            InterruptedException {

        final File executorDirectory = new File("src/main/resources/");

private final static String shellScript = "./";
ProcessBuilder processBuilder = new ProcessBuilder(shellScript, command.getOperation(), "argument-one");;
          Process process = processBuilder.start();
          try {
            int shellExitStatus = process.waitFor();
            if (shellExitStatus != 0) {
      "Successfully executed the shell script");
        } catch (InterruptedException ex) {
            logger.error("Shell Script process was interrupted");

- Third way : jsch

Run a command over SSH with JSch

- YarnClient class -fourth way

One of my favourite book Data algorithms uses this approach

// import required classes and interfaces
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;

public class SubmitSparkJobToYARNFromJavaCode {

   public static void main(String[] arguments) throws Exception {

       // prepare arguments to be passed to 
       // org.apache.spark.deploy.yarn.Client object
       String[] args = new String[] {
           // the name of your application

           // memory for driver (optional)

           // path to your application's JAR file 
           // required in yarn-cluster mode      

           // name of your application's main class (required)

           // comma separated list of local jars that want 
           // SparkContext.addJar to work with      

           // argument 1 to your Spark program (SparkFriendRecommendation)

           // argument 2 to your Spark program (SparkFriendRecommendation)

           // argument 3 to your Spark program (SparkFriendRecommendation)

           // argument 4 to your Spark program (SparkFriendRecommendation)
           // this is a helper argument to create a proper JavaSparkContext object
           // make sure that you create the following in SparkFriendRecommendation program
           // ctx = new JavaSparkContext("yarn-cluster", "SparkFriendRecommendation");

       // create a Hadoop Configuration object
       Configuration config = new Configuration();

       // identify that you will be using Spark as YARN mode
       System.setProperty("SPARK_YARN_MODE", "true");

       // create an instance of SparkConf object
       SparkConf sparkConf = new SparkConf();

       // create ClientArguments, which will be passed to Client
       ClientArguments cArgs = new ClientArguments(args, sparkConf); 

       // create an instance of yarn Client client
       Client client = new Client(cArgs, config, sparkConf); 

       // submit Spark job to YARN;