user6275035 user6275035 - 2 months ago 20
C++ Question

Sync queue between two threads

This is a simple program which has a function start() which waits for user to enter something(using infinite loop) and stores it in queue. start() runs in a separate thread. After user enters some value, the size of queue remains zero in main. How can the queue be synchronized?

code: source.cpp


#include <iostream>
#include "kl.h"

using namespace std;

int main()
{
std::thread t1(start);
while (1)
{
if (q.size() > 0)
{
std::cout << "never gets inside this if\n";
std::string first = q.front();
q.pop();
}
}
t1.join();
}


code: kl.h


#include <queue>
#include <iostream>
#include <string>

void start();
static std::queue<std::string> q;


code: kl.cpp


#include "kl.h"
using namespace std;

void start()
{
char i;
string str;
while (1)
{
for (i = 0; i <= 1000; i++)
{
//other stuff and str input
q.push(str);
}

}
}

Answer

Your code contains a race - by me it crashed; both threads are potentially modifying a shared queue. (Also, you're looping with char i for values up to 1000 - not a good idea, probably.)

You should protect your shared queue with a std::mutex, and use a std::condition_variable to notify that there is a reason to check the queue.

Specifically, you should consider the following (which is very common for your case of a producer consumer):

  1. Access the queue only when holding the mutex.

  2. Use the condition variable to notify that you've pushed something into it.

  3. Use the condition variable to specify a condition on when there's a point to continue processing.

Here is a rewrite of your code:

#include <iostream>
#include <queue>
#include <thread>
#include <condition_variable>
#include <mutex>

using namespace std;

std::queue<std::string> q;
std::mutex m;
std::condition_variable cv;

void start()
{
    string str;
    for (std::size_t i = 0; i <= 1000; i++) {
        //other stuff and str input
        std::cout << "here" << std::endl;
        std::unique_lock<std::mutex> lk(m);
        q.push(str);
        lk.unlock();
        cv.notify_one();
    }
}

int main()
{
    std::thread t1(start);
    for (std::size_t i = 0; i <= 1000; i++)
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return !q.empty();});
        std::string first = q.front();
        q.pop();    
    }
    t1.join();
}
Comments