Tsaari Tsaari - 4 months ago 16
Perl Question

Perl - parallel programming - running two external programs

I have a perl script that runs two external programs, one dependent on the other, for a series of datasets. Currently, I just do this for each dataset one at a time, run it through the first program, gather the results with qx, and use those results to run the second program. The data is added to an output file with the results of the second program, one file for each dataset. I've created a simple reproducible example which hopefully captures my current approach:

#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl

use warnings;
use strict;

my @queries_list = (2, 4, 3, 1);

foreach my $query (@queries_list) {
#Command meant to simulate the first, shorter process, and return a list of results for the next process
my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3";
print "Running program_1 on query $query...\n";
my @results = qx($cmd_1);

foreach (@results) {
chomp $_;
#Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output";
print "\tRunning program_2 on query $query with input param $_...\n";
system($cmd_2); }
}


Since the first program generally completes faster than the second one, I thought it's probably possible to speed up this whole deal by continuing to run new queries through program_1 at the same time that program_2 is also running on a previous query. It'd be great to speed this up, as it currently takes many hours of processing to complete. However, I'm not sure how to go about this. Would something like Parallel::ForkManager have a solution? or using threads in Perl?

Now in my actual code I do some error handling and set a timeout for program_2 - I use fork, exec, and $SIG{ALRM} to do this, but I don't really know what I'm doing with those. It's important that I still have the ability to do this, otherwise program_2 might get stuck or inadequately report on why it failed. Here's what the code looks like with error handling. I don't think it works the way it should in the reproducible example, but at least you'll hopefully see what I'm trying to do. Here's with error handling:

#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl

use warnings;
use strict;

my @queries_list = (2, 4, 3, 1);

foreach my $query (@queries_list) {
#Command meant to simulate the first, shorter process, and return a list of results for the next process
my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3";
print "Running program_1 on query $query...\n";
my @results = qx($cmd_1);

foreach (@results) {
chomp $_;
#Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output";
print "\tRunning program_2 on query $query with input param $_...\n";

my $childPid;
eval {
local $SIG{ALRM} = sub { die "Timed out" };
alarm 10;
if ($childPid = fork()) {
wait();
} else {
exec($cmd_2);
}
alarm 0;
};
if ($? != 0) {
my $exitCode = $? >> 8;
print "Program_2 exited with error code $exitCode. Retry...\n";
}
if ($@ =~ /Timed out/) {
print "\tProgram_2 timed out. Skipping...\n";
kill 2, $childPid;
wait;
};
}
}


All help is appreciated.

Answer

One solution:

use threads;

use Thread::Queue;  # 3.01+

sub job1 { ... }
sub job2 { ... }

{
   my $job1_request_queue = Thread::Queue->new();
   my $job2_request_queue = Thread::Queue->new();

   my $job1_thread = async {
      while (my $job = $job1_request_queue->dequeue()) {
         my $result = job1($job);
         $job2_request_queue->enqueue($result);
      }

      $job2_request_queue->end();
   };

  my $job2_thread = async {
      while (my $job = $job2_request_queue->dequeue()) {
         job2($job);
      }
   };

   $job1_request_queue->enqueue($_) for ...;

   $job1_request_queue->end();    
   $_->join() for $job1_thread, $job2_thread;
}

You could even have multiple worker of either/both types.

use threads;

use Thread::Queue;  # 3.01+

use constant NUM_JOB1_WORKERS => 1;
use constant NUM_JOB2_WORKERS => 3;

sub job1 { ... }
sub job2 { ... }

{
   my $job1_request_queue = Thread::Queue->new();
   my $job2_request_queue = Thread::Queue->new();

   my @job1_threads;
   for (1..NUM_JOB1_WORKERS) {
      async {
         while (my $job = $job1_request_queue->dequeue()) {
            my $result = job1($job);
            $job2_request_queue->enqueue($result);
         }
      };
   }

   my @job2_threads;
   for (1..NUM_JOB2_WORKERS) {
      async {
         while (my $job = $job2_request_queue->dequeue()) {
            job2($job);
         }
      };
   }

   $job1_request_queue->enqueue($_) for ...;

   $job1_request_queue->end();    
   $_->join() for @job1_threads;
   $job2_request_queue->end();
   $_->join() for @job2_threads;
}

Use IPC::Run instead of qx to add a timeout. No need for signals.

Comments