Alex Alex - 8 months ago 37
Perl Question

Perl threads and semaphores

I am processing files with a function and do start my threads like this:

for my $file (@files){
$threads[$k] = threads->create('function', $file);

I would like to limit the number of parallel processes. How is that done? I have looked at plenty Semaphore/Queue examples but couldn't find anything that simple for my case.

Is there any idea how I can simply limit the thread number?


The most straight forward way of limiting parallelism, and also one of the more effective ways to deploy threads, is to operate a 'worker thread' model.

Specifically - have a thread that sits in a loop, reading a queue, and operating on it.

That'd be something like this:


use strict;
use warnings;

use threads;

use Thread::Queue;

my $nthreads = 5;

my $process_q = Thread::Queue->new();
my $failed_q  = Thread::Queue->new();

#this is a subroutine, but that runs 'as a thread'.
#when it starts, it inherits the program state 'as is'. E.g.
#the variable declarations above all apply - but changes to
#values within the program are 'thread local' unless the
#variable is defined as 'shared'.
#Behind the scenes - Thread::Queue are 'shared' arrays.

sub worker {
    #NB - this will sit a loop indefinitely, until you close the queue.
    #using $process_q -> end
    #we do this once we've queued all the things we want to process
    #and the sub completes and exits neatly.
    #however if you _don't_ end it, this will sit waiting forever.
    while ( my $server = $process_q->dequeue() ) {
        print threads->self()->tid() . ": pinging $server\n";
        my $result = `/bin/ping -c 1 $server`;
        if ($?) { $failed_q->enqueue($server) }
        print $result;

#insert tasks into thread queue.
open( my $input_fh, "<", "server_list" ) or die $!;

#we 'end' process_q  - when we do, no more items may be inserted,
#and 'dequeue' returns 'undefined' when the queue is emptied.
#this means our worker threads (in their 'while' loop) will then exit.

#start some threads
for ( 1 .. $nthreads ) {
    threads->create( \&worker );

#Wait for threads to all finish processing.
foreach my $thr ( threads->list() ) {

#collate results. ('synchronise' operation)
while ( my $server = $failed_q->dequeue_nb() ) {
    print "$server failed to ping\n";

Semaphores are really about arbitrating access to limited resources, and for 'guarding' part of a process.

So if you wanted to include - say - an ssh operation in your code, but didn't want to have more than 20 connections concurrently, you'd:

my $ssh_limit = Thread::Semaphore -> new (20); 

And in your thread:

$ssh_limit -> down;
 #do ssh thing
$ssh_limit -> up;

Each thread will block until there's available resource.

But this is not an effective way to control 'whole threads' - the answer there is just start the right number in the first place, and use queues to feed them data.