ameerosein ameerosein - 1 month ago 6
C++ Question

what is the optimal Multithreading scenario for processing a long file lines?

I have a big file and i want to read and also [process] all lines (even lines) of the file with multi threads.

One suggests to read the whole file and break it to multiple files (same count as threads), then let every thread process a specific file. as this idea will read the whole file, write it again and read multiple files it seems to be slow (3x I/O) and i think there must be better scenarios,

I myself though this could be a better scenario:

One thread will read the file and put the data on a global variable and other threads will read the data from that variable and process. more detailed:

One thread will read the main file with running

func1
function and put each even line on a Buffer:
line1Buffer
of a max size
MAX_BUFFER_SIZE
and other threads will pop their data from the Buffer and process it with running
func2
function. in code:

Global variables:

#define MAX_BUFFER_SIZE 100
vector<string> line1Buffer;
bool continue = true;// to end thread 2 to last thread by setting to false
string file = "reads.fq";


Function
func1
: (thread 1)

void func1(){
ifstream ifstr(file.c_str());
for (long long i = 0; i < numberOfReads; i++) { // 2 lines per read
getline(ifstr,ReadSeq);
getline(ifstr,ReadSeq);// reading even lines
while( line1Buffer.size() == MAX_BUFFER_SIZE )
; // to delay when the buffer is full
line1Buffer.push_back(ReadSeq);
}
continue = false;
return;
}


And function
func2
: (other threads)

void func2(){
string ReadSeq;
while(continue){
if(line2Buffer.size() > 0 ){
ReadSeq = line1Buffer.pop_back();
// do the proccessing....
}
}
}


About the speed:

If the reading part is slower so the total time will be equal to reading the file for just one time(and the buffer may just contain 1 file at each time and hence just 1 another thread will be able to work with thread 1). and if the processing part is slower then the total time will be equal to the time for the whole processing with
numberOfThreads - 1
threads. both cases is faster than reading the file and writing in multiple files with 1 thread and then read the files with multi threads and process...

and so there is 2 question:

1- how to call the functions by threads the way thread 1 runs
func1
and others run
func2
?

2- is there any faster scenario?

3-[Deleted] anyone can extend this idea to M threads for reading and N threads for processing? obviously we know :
M+N==umberOfThreads
is true

Edit: the 3rd question is not right as multiple threads can't help in reading a single file

Thanks All

Answer

An other approach could be interleaved thread. Reading is done by every thread, but only 1 at once. Because of the waiting in the very first iteration, the threads will be interleaved.

But this is only an scaleable option, if work() is the bottleneck (then every non-parallel execution would be better)

Thread:

while (!end) {
    // should be fair!
    lock();
    read();
    unlock();

    work();
}

basic example: (you should probably add some error-handling)

void thread_exec(ifstream* file,std::mutex* mutex,int* global_line_counter) {
    std::string line;
    std::vector<std::string> data;
    int i;
    do {
        i = 0;
        // only 1 concurrent reader
        mutex->lock();
        // try to read the maximum number of lines
        while(i < MAX_NUMBER_OF_LINES_PER_ITERATION && getline(*file,line)) {
            // only the even lines we want to process
            if (*global_line_counter % 2 == 0) {
                data.push_back(line);
                i++;
            }
            (*global_line_counter)++;

        }
        mutex->unlock();

        // execute work for every line
        for (int j=0; j < data.size(); j++) {
            work(data[j]);
        }

        // free old data
        data.clear();
     //until EOF was not reached
   } while(i == MAX_NUMBER_OF_LINES_PER_ITERATION);

}

void process_data(std::string file) {
     // counter for checking if line is even
     int global_line_counter = 0;
     // open file
     ifstream ifstr(file.c_str());
     // mutex for synchronization
     // maybe a fair-lock would be a better solution
     std::mutex mutex;
     // create threads and start them with thread_exec(&ifstr, &mutex, &global_line_counter);
     std::vector<std::thread> threads(NUM_THREADS);
     for (int i=0; i < NUM_THREADS; i++) {
         threads[i] = std::thread(thread_exec, &ifstr, &mutex, &global_line_counter);
     }
     // wait until all threads have finished
     for (int i=0; i < NUM_THREADS; i++) {
         threads[i].join();
     }
}