user2164703 user2164703 - 2 months ago 13
C++ Question

Is this use of atomic correct?

i have the following hierarchy:

struct Point
{
std::atomic<int> x;
std::atomic<int> y;
std::atomic<int> z;
}

class Data
{
std::atomic<int> version;
Point point;
}

unordered_map<std::string,Data> hashtable; // global


Thread1 is the producer:

Thread 1 receive a
std::pair<std::string,Point> new_data
, it does:

shared_hashtable[new_data.first].point.x.store(new_data.second.x,memory_order_release) // update X
shared_hashtable[new_data.first].point.y.store(new_data.second.y,memory_order_release) // update Y
shared_hashtable[new_data.first].point.z.store(new_data.second.z,memory_order_release) // update Z
hashtable[new_data.first].version.store(hashtable[new_data.first].version+1,memory_order_release) // update the version of the data


Thread 2 does:

int local_version;
while(local_version!=shared_hashtable["string1"].version.load(memory_order_acquire))
{
//...process data...
local_version=shared_hashtable["string1"].version.load(memory_order_acquire);
}


Does the memory order passed guarantee that my store and load wont be reordered.
Does the design work as expected : if the object at
hastable["string1"]
is updated, will I process the corresponding data in thread 2 ?

Answer Source

hashtable isn't protected against simultaneous access, and its operator[] is not const. If you guarantee that (1) hashtable["string1"] is inserted into the table before these two threads try to access it, and (2) no other threads are writing to hashtable during the time these two threads are running, the lookups in hashtable won't result in a data race. If you can't guarantee (1) and (2), then you need to protect the lookups with a mutex. unordered_map invalidates iterators when it rehashes, but references stay valid until the referenced item is removed from the map.

The memory model guarantees you that the writes in thread 1 that precede the memory_order_release write to version are visible to thread 2 after it memory_order_acquire reads the corresponding value from version. This would be the case even if the accesses to the Point members were non-atomic.

However, it is possible that the reads of the Point members in thread 2 see values from a later write in thread 1, so you are not guaranteed that the three Point members read in thread 2 correspond to a particular Point that was actually written by thread 1. I imagine you need a guarantee that the Point processed by thread 2 is in fact some Point written by thread 1 and not an aggregate of the values of multiple different points (e.g., x from version 1, y from version 2, z from version 3). Changing the design from

struct Point { atomic<int> x, y, x; };

to

struct Point { int x, y, x; };
struct Data {
  atomic<int> version;
  atomic<Point> point;
};

will assure that the Point read is actually a Point written. Of course, it isn't necessarily the Point that corresponds to the given version: point may already have been overwritten by a later version in thread 1 by the time thread 2 gets around to reading it. This could result in the same point being processed twice by thread 2, once with a stale version and again on a later iteration with the proper version. If you need to ensure that each version is processed at most once, you have to verify that version is the same after reading point as it was before reading point.

Putting it all together, we get this program (DEMO):

struct Point
{
  int x, y, z;
};

struct Data
{
  std::atomic<int> version;
  std::atomic<Point> point;
};

std::unordered_map<std::string, Data> hashtable;
std::mutex mtx;

Data& lookup(const std::string& key) {
  std::lock_guard<std::mutex> guard{mtx};
  return hashtable[key];
}

void thread1() {
  std::string key;
  Point point;
  std::tie(key, point) = // get new key/point pair
  auto& ref = lookup(key);
  ref.point.store(point, std::memory_order_relaxed);
  ref.version.fetch_add(1, std::memory_order_release);
}

void thread2() {
  auto& ref = lookup("string1");
  int local_version = 0;
  for (;;) {
    auto const version = ref.version.load(std::memory_order_acquire);
    if (local_version != version) {
      auto point = ref.point.load(std::memory_order_relaxed);
      if (version == ref.version.load(std::memory_order_acquire)) {
        local_version = version;
        // ...process point...
      }
    }
  }
}