QWD666 QWD666 - 6 months ago 19
Perl Question

Threads.maximum value in the array

Actually, let's say, 2 thread started. And an array of 5 elements, all calculations are carried out 1 trade. 2 appears and dies for lack of work. Or, if you specify 9999 elements, then he wakes up in the last 30-50 items. It seems to me that he does not really multithreaded.

#! usr/bin/perl -w
use strict;
use warnings;
use 5.010;
use threads;
use threads::shared;
use Data::Dumper;

my $n=0;

my $max:shared=0;
my @threads;
my $shr:shared=0;
say "Enter the number of threads";
my $threads=<stdin>;
my @pack;

#Заполняем массив
say "Enter the number of array elements";
my $c=<stdin>;
for (my $i=0;$i<$c;$i++){
my $r= int rand 999;
@pack=(@pack,$r);
}
say @pack;

# Создаём нужное количество потоков
for my $t (1..$threads) {
push @threads,threads->create(\&find,$t);
}
# Дожидаемся окончания работы всех потоков
for my $t (@threads){ $t->join();}

sub find {
# Номер текущего потока
my $num= shift;
say "+Thread $num started";

while($n<=$#pack){
# Распределяем элементы по потокам
lock($shr);#lock ($max);
my $stg=1+$shr++;

# Если элементы закончились, прерываем работу
if ($stg>=$#pack+2) {
say "-Thread $num done. \n";
return;}


if($max<$pack[$n]){$max=$pack[$n];}
say "Thread: $num \tStage: $stg \tMax: $max";
$n++;
sleep 1+int rand (3);
}
}

Answer

The entire work code of the thread is in a block in which lock($shr) is obtained, preventing more than one thread from doing any work.


It sounds like you have a large array, and you want worker threads to process parts of that large array.

The only time the thread needs to be synchronized is when grabbing work, and when updating the results.

Note that you need to lock the array when you are accessing it, but you can't keep it lock when you are working, so you'll need to copy to work unit into a thread-local variable.

The follow finds the sum of all elements of an array.

#!/usr/bin/perl
use strict;
use warnings;
use feature qw( say );

use threads;
use threads::shared;

use List::Util qw( sum );

# This is where the real work is done.
# Notice how it there's no thread-related code in it
# and how it doesn't use any shared variables.
sub work {
    sleep(1+rand(3));
    return sum(@_);
}

{
    @ARGV == 3
        or die("usage\n");

    my ($data_size, $num_threads, $chunk_size) = @ARGV;

    my $next :shared = 0;
    my @data :shared;
    my $sum  :shared;

    for (1..$data_size) {
        push @data, int(rand(9999));
    }

    say "Without threads: ", sum @data;

    for my $t (1..$num_threads) {
        async {
            say sprintf("[%s]: Worker started.", threads->tid);
            LOOP: while (1) {
                # Grab work.
                my @local_data;
                {
                    lock $next;
                    last LOOP if $next >= @data;

                    my $new_next = $next + $chunk_size;
                    $new_next = @data if $new_next > @data;

                    say sprintf("[%s] Processing %s..%s", threads->tid, $next, $new_next-1);
                    @local_data = @data[$next .. $new_next-1];
                    $next = $new_next;
                }

                my $local_sum = work(@local_data);    

                # Update results.
                { lock $sum; $sum += $local_sum; }
            }
            say sprintf("[%s] done.", threads->tid)
        };
    }

    $_->join for threads->list;

    say "With threads: $sum";
}

Output:

$ ./a 125 3 20
Without threads: 577556
[1]: Worker started.
[1] Processing 0..19
[2]: Worker started.
[2] Processing 20..39
[3]: Worker started.
[3] Processing 40..59
[2] Processing 60..79
[1] Processing 80..99
[3] Processing 100..119
[1] Processing 120..124
[2] done.
[1] done.
[3] done.
With threads: 577556
Comments