Xyster Xyster - 4 months ago 9
Java Question

Java - HashMap values - volatile

I have a situation where I have N threads that do same the job, and one thread X that does something different.

Each thread N reads/writes into a static object which is of a class (called MMLCounter) which is wrapper around HashMap, and each thread works with different key/value pair of that HashMap, so all threads read/write values into the HashMap at the same time.
Thread X periodically needs to access all values, and while it's accessing them (from the moment it accesses first value till the moment it accesses the last value, none of the other N threads may change the values in HashMap).

HashMap is initialized and key/values added to it by threads during thread creation at the start of program execution, and later no new key/values are added, only values in HashMap change.

Because of this I didn't use ConcurrentHashMap or synchronized functions, but instead I created a wrapper class around HashMap, which additionally has a flag which signals the N threads are they allowed to change the values, and this flag is changed exclusively by thread X.

This way all N threads can work with HashMap in parallel, but when thread X starts its work only it can work with the HashMap until it finishes.

My question here is do I need to declare anything as volatile (for example values in HashMap), and if yes, what and how?

Thing what I would like to avoid (don't know if it is possible that it happens) is that one of the N threads changes a value in HashMap, but that change of value is only reflected in local cached memory of that thread, and when thread X reads that value from HashMap it will read it from its local cached memory which is not in sync with local cached memory of the other N thread, meaning it will have a old value.

Here is the code:

public static void main(String[] args) throws ProtocolException {

int NUMBER_OF_THREADS = 400;

List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
int measureInterval = 10000;
try {
Thread.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("--> MML rate is : " + MMLGenerator.MML_COUNTER.getMMLRate(measureInterval/1000) + " MML per second.");
}
}));

//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}

class MMLGenerator implements Runnable {

public static volatile MMLCounter MML_COUNTER = new MMLCounter();
private int threadNmbr = 0;

public MMLGenerator(int threadNmbr) {
this.threadNmbr = threadNmbr;
MMLGenerator.MML_COUNTER.put(this.threadNmbr, 0);

}

@Override
public void run() {
while(RUN_ACTIVE) {
MML_COUNTER.increaseCounter(this.threadNmbr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

public class MMLCounter {

private Map<Integer,Integer> MMLCounter = new HashMap<Integer, Integer>();
private boolean MMLCounterLocked = false;

public Integer get(Integer key) {
return this.MMLCounter.get(key);
}

public Integer put(Integer key, Integer value) {
while (this.MMLCounterLocked) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return this.MMLCounter.put(key, value);
}

public void increaseCounter(Integer key) {
while (this.MMLCounterLocked) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
this.MMLCounter.put(key,this.MMLCounter.get(key).intValue() + 1);
}

public int getMMLRate(int measurementTime) {
this.MMLCounterLocked = true;
int MMLCounterSum = 0;
for (Integer counterID : this.MMLCounter.keySet()) {
int counter = this.MMLCounter.get(counterID);
MMLCounterSum += counter;
this.MMLCounter.put(counterID, 0);
}
this.MMLCounterLocked = false;
return MMLCounterSum/measurementTime;
}


}




AFTER MODIFICATION



Thank you everybody for help.
I just now read
ReentrantReaderWriterLock
description and that really is what I need. Below is the modified code.

However, I still have two questions:

1) Why do I need to use also
ConcurrentHashMap
instead of
HashMap
if I protected critical part of codes with
ReentrantReaderWriterLock
?

2) This usage of
ReentrantReaderWriterLock
will only substitute from my previous implementation the usage of flag which I now see was not done correctly. However, I still have the problem of value objects in
HashMap
not being volatile, so different threads will each have their own locally cached copy of a value which is not in sync with locally cached copy of the value from other threads?

public static void main(String[] args) throws ProtocolException {

int NUMBER_OF_THREADS = 400;

List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);
futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
int measureInterval = 10000;
try {
Thread.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("--> MML rate is : " + MMLGenerator.counter.getMMLRate(measureInterval/1000) + " MML per second.");
}
}));

//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator(threadNmbr))));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}

class MMLGenerator implements Runnable {

public static MMLCounter counter = new MMLCounter();
private int threadNmbr = 0;

public MMLGenerator(int threadNmbr) {
this.threadNmbr = threadNmbr;
MMLCounter.counter.put(this.threadNmbr, 0);

}

@Override
public void run() {
while(RUN_ACTIVE) {
MMLCounter.counter.increaseCounter(this.threadNmbr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

public class MMLCounter {

private Map<Integer,Integer> counter = new HashMap<Integer, Integer>();
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

public Integer put(Integer key, Integer value) {
lock.readLock().lock();
Integer oldValue = this.counter.put(key, value);
lock.readLock().unlock();
return oldValue;
}

public void increaseCounter(Integer key) {
lock.readLock().lock();
this.counter.put(key,this.counter.get(key).intValue() + 1);
lock.readLock().unlock();
}

public int getMMLRate(int measurementTime) {
lock.writeLock().lock();
int counterSum = 0;
for (Integer counterID : this.counter.keySet()) {
counterSum += this.counter.get(counterID);;
this.counter.put(counterID, 0);
}
lock.writeLock().unlock();
return counterSum/measurementTime;
}
}





AFTER 2nd MODIFICATION



I now figured out that logic I need to implement requires me to manipulate several counters, not just one, from multiple threads, and each thread at any time can change any counter.
Below is my implementation, but I'm not certain if I in a good way in regards to performance and data consistency.

I except to have random number of counters (number of counters will be know at start of execution) which will be identified by String value, and each Counter must count two values (first value will always increase, a second value only sometimes, but if they increase they need to increase at the same time). When I need to have sum of each Counter, I need fetch both Counter values in an atomic operation, and also from the time I fetch the first Counter till the time I fetch last Counter, none of the Counters may be changed by other threads.

For demonstration purposes, as Counter identification (key in HashMap) I took String value of counter's ordinal number, and to determine which Counter needs to increased in each iteration of each thread, as well as determining if just one or both values of Counter need to increase, I used Random generator.

public static void main(String[] args) {

int NUMBER_OF_THREADS = 400;


MMLGenerator.counterNmbr(2);
List<Future<?>> futureList = new ArrayList<Future<?>>();
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS+1);

futureList.add(executor.submit(new Runnable() {
@Override
public void run() {
while(true)
{
int measureInterval = 10;
try {
TimeUnit.SECONDS.sleep(measureInterval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
MMLGenerator.lock.writeLock().lock();
for (String counterId : MMLGenerator.counter.keySet()) {
MMLCounterSimple counter = MMLGenerator.counter.get(counterId).getCountAndReset();
System.out.println("--> For counter " + counterId + " total is : " + counter.getTotal() + ", and failed is : " + counter.getFailed());
}
MMLGenerator.lock.writeLock().unlock();
}
}
}));

//create and start sending threads.
for (int threadNmbr = 0; threadNmbr < NUMBER_OF_THREADS; threadNmbr++) {
futureList.add(executor.submit(new Thread(new MMLGenerator())));
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

//wait for threads to finish.
for (Future<?> future : futureList) {
try {
future.get();
} catch (InterruptedException e) {
}
catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
}
}
executor.shutdown();
}



class MMLGenerator implements Runnable {

public static volatile HashMap<String, MMLCounter> counter = new HashMap<String, MMLCounter>();
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);


public static void counterNmbr(int counterNmbr) {
lock.writeLock().lock();
for(int i = 0; i < counterNmbr; i++) {
counter.put(new Integer(i).toString(), new MMLCounter());
}
lock.writeLock().unlock();
}

@Override
public void run() {
while(RUN_PROVISIONING) {
lock.readLock().lock();
String counterID = new Integer(new Random().nextInt(counter.size())).toString();
long failedInc = 0;
if (new Random().nextInt(2) == 0) {
failedInc = 1;
}
counter.get(counterID).increaseCounter(failedInc);
lock.readLock().unlock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}


public class MMLCounter {

private volatile long total = 0;
private volatile long failed = 0;
public static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);


public synchronized void increaseCounter(long failedInc) {
lock.writeLock().lock();
total++;;
failed = failed + failedInc;
lock.writeLock().unlock();
}

public synchronized MMLCounterSimple getCountAndReset() {
lock.writeLock().lock();
MMLCounterSimple simpleCounter = new MMLCounterSimple(total, failed);
total = 0;
failed = 0;
lock.writeLock().unlock();
return simpleCounter;
}

}


public class MMLCounterSimple {

private long total = 0;
private long failed = 0;

public MMLCounterSimple(long total, long failed) {
this.total = total;
this.failed = failed;
}

public long getTotal() {
return this.total;
}

public long getFailed() {
return this.failed;
}
}

Answer

As written, this isn't guaranteed to work as you expect. There's no synchronization point between the writes performed by the N threads and the read performed by the X thread. Even the MMLCounterLocked flag could be ignored by writers.

In addition to making your code work correctly, using higher-level concurrency tools like ConcurrentMap will drastically simplify your code.


Since you only require the sum, a LongAccumulator will suffice, and it makes the code very simple and safe.

  public static void main(String[] args)
    throws Exception
  {
    int NUMBER_OF_THREADS = 400;
    ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    LongAccumulator sum = new LongAccumulator(Long::sum, 0);
    for (int i = 0; i < NUMBER_OF_THREADS; ++i) {
      executor.submit(new MMLGenerator(sum));
      TimeUnit.MILLISECONDS.sleep(50); /* Why??? */
    }
    int interval = 10;
    TimeUnit.SECONDS.sleep(interval);
    long rate = sum.getThenReset() / interval;
    System.out.println("--> MML rate is : " + rate + " MML per second.");
    executor.shutdownNow();
  }

  private static final class MMLGenerator
    implements Runnable
  {

    private final LongAccumulator counter;

    MMLGenerator(LongAccumulator counter)
    {
      this.counter = counter;
    }

    @Override
    public void run()
    {
      while (true) {
        counter.accumulate(1);
        try {
          TimeUnit.SECONDS.sleep(1);
        }
        catch (InterruptedException shutdown) {
          break;
        }
      }
    }

  }

As for your two new questions:

  1. "Why do I need to use also ConcurrentHashMap instead of HashMap if I protected critical part of codes with ReentrantReaderWriterLock?"

You didn't protect the critical part with a read-write lock. You are acquiring the read lock when you write the table, and the write lock when you read it. The behavior of HashMap isn't defined under concurrent modification. You could cause threads that are using the table to hang.

Also, you should use a try-finally construct to ensure that you unlock regardless of any errors that occur.

If you used a ConcurrentMap, threads could update without acquiring a lock on the entire table, which is what you'll be doing when you apply the read-write lock correctly.

  1. "This usage of ReentrantReaderWriterLock will only substitute from my previous implementation the usage of flag which I now see was not done correctly. However, I still have the problem of value objects in HashMap not being volatile, so [will] different threads … each have their own locally cached copy of a value which is not in sync with locally cached copy of the value from other threads?"

No, Lock acquisition does synchronize with other threads, and changes by other threads that happen before the lock is acquired will be visible. If you fixed your locking, the HashMap would work correctly, but it would do so by locking the entire table during updates.