Ieaturaw Ieaturaw - 12 days ago 4
Linux Question

C++ pThread program isn't running to completion

I'm having an odd issue where my C++ multithreaded program doesn't run to completion. Its supposed to run NUM_THREADS - 1 times. Any resources to help point me in the right direction would be appreciated!
Currently these are the important functions

int main(void) {
int ret;
//Threads + current thread tracker
pthread_t threads[NUM_THREADS]; int j = 0;
int num_transacts = 0; bool isBuy = true;

srand(time(NULL));
Stock MSFT("MSFT"); stocks.push_back(MSFT);
Stock AMZN("AMZN"); stocks.push_back(AMZN);
Stock BAC("BAC"); stocks.push_back(BAC);
Stock NKE("NKE"); stocks.push_back(NKE);

//Buys or Sells for NUM_THREADS - 1 times
while (num_transacts < NUM_THREADS) {
if (isBuy) {
//Percent chance to buy
if (((double)rand() / (double)RAND_MAX) < Z) {
cout << "Making buy thread.\n";
pthread_mutex_lock(&mLock);

ret = pthread_create(&threads[j], NULL, buy, NULL);
if (ret != 0) {
cout << "Thread failed at Buy Thread Create.\n";
}

pthread_mutex_unlock(&mLock);
cout << "BuyNumTrans: " << num_transacts << "\n";

++j;
++num_transacts;
isBuy = false;
}
}
else {
if (bought.size() > 0) {
//Percent chance to sell if it has increased or decreased too much in price
int s = rand() % bought.size();
if ((bought.at(s).checkPrice()) > (((1 + X)*bought.at(s).getCost())) || (bought.at(s).checkPrice()) < (((1 + Y)*bought.at(s).getCost()))) {
cout << "Making sell thread.\n";

pthread_mutex_lock(&mLock);
ret = pthread_create(&threads[j], NULL, sell, (void*) &s);
if (ret != 0) {
cout << "Thread failed at Sell Thread Create.\n";
}

pthread_mutex_unlock(&mLock);
cout << "SellNumTrans: " << num_transacts << "\n";

++j;
++num_transacts;
}
//Gets a new price possibility
bought.at(s).getPrice();
}
isBuy = true;
}
}

for (int i = 0; i < NUM_THREADS; ++i) {
pthread_join(threads[i], NULL);
}

return 0;
}

pthread_mutex_t mLock;
pthread_mutex_t bLock;
pthread_mutex_t sLock;
vector<Stock> stocks;
vector<Stock> bought;
int balance = 1000000, yield = 0, profit = 0, Tcost = 0;

//In format BUY MSFT 100 35.2
//In format SELL MSFT 80 45
void* processTransact(void* argument) {
string* arg = (string*)argument;
istringstream iss(*arg);
vector<string> words{ istream_iterator<string>(iss), istream_iterator<string> {} };

if (words.front() == "BUY") {
words.erase(words.begin());

//gets the symbol name
string symbol = words.front();
words.erase(words.begin());

//gets num of stocks to buy
string a = words.front();
int buyNum = atoi(a.c_str());
words.erase(words.begin());

//gets the price per share
a = words.front();
int sharePrice = atoi(a.c_str());

//update num_shares, cost, balance, and Tcost
Stock newBuy(symbol, buyNum, sharePrice);

balance -= (buyNum * sharePrice);
bought.push_back(newBuy);

cout << "Bought stock... " << balance << endl;

pthread_exit(NULL);
}
else {
words.erase(words.begin());

//gets the symbol name
string symbol = words.front();
words.erase(words.begin());

//gets num of stocks to sell
string a = words.front();
int buyNum = atoi(a.c_str());
words.erase(words.begin());

//gets the price per share
a = words.front();
int sharePrice = atoi(a.c_str());

a = words.front();
int s = atoi(a.c_str());

//update num_shares, cost, balance, and Tcost
balance += (buyNum * sharePrice);
bought.erase(bought.begin() + s);

cout << "Sold stock... " << balance << endl;

pthread_exit(NULL);
}

sleep(2);
}

void* buy(void*) {
pthread_mutex_lock(&mLock);
pthread_t thread;
srand(time(NULL));
int ret;
int i = rand() % stocks.size();

//Creates a string that processTransact can parse
string transactString = "BUY " + stocks.at(i).getName() + " 100 " + to_string(stocks.at(i).getPrice());//make transaction string

ret = pthread_create(&thread, NULL, processTransact, (void*)&transactString);
if (ret != 0) {
cout << "Error in buy thread process create.\n";
}

pthread_join(thread, NULL);
pthread_mutex_unlock(&mLock);
pthread_exit(NULL);
}

void* sell(void* argument) {
pthread_mutex_lock(&mLock);
pthread_t thread;
int ret, s = *((int*)argument);;

//Creates a string that processTransact can parse
string transactString = "SELL " + bought.at(s).getName() + " 100 " + to_string(bought.at(s).getPrice()) + to_string(s);//make transaction string

ret = pthread_create(&thread, NULL, processTransact, (void*)&transactString);
if (ret != 0) {
cout << "Error in sell thread process create.\n";
}

pthread_join(thread, NULL);
pthread_mutex_unlock(&mLock);
pthread_exit(NULL);
}

Answer

In the code above I see potential Undefined Behavior problem during bought variable access. This could be the reason of your problem.

In the while loop there is lines:

if (bought.size() > 0) {
  //Percent chance to sell if it has increased or decreased too much in price
  int s = rand() % bought.size();

In void* processTransact(void* argument) function:

bought.push_back(newBuy);
// Code skipped ....  
bought.erase(bought.begin() + s);

Code from processTransact modifies size of bought in separate thread. In the while loop access to bought.size() is not synchronized with use of mLock mutex. It means you could potentially get empty bought vector after bought.size() > 0 validation. It causes Undefined Behavior at rand() % 0.

Moreover, C++ standard doesn't guarantee thread safety when same container (bought) modified and read from different threads.