ocramot ocramot - 7 months ago 30
Java Question

Java How to implement lock on ConcurrentHashMap read

TL;DR: in Java I have N threads, each using a shared collection. ConcurrentHashMap allows me to lock on write, but not on read. What I need is to lock a specific item of the collection, read the previous data, do some computation, and update the values. If two threads receive two messages from the same sender, the second thread has to wait for the first one to finish, before doing its stuff.




Long version:

These threads are receiving chronologically ordered messages, and they have to update the collection basing on a
messageSenderID
.

My code simplified is as follow:

public class Parent {
private Map<String, MyObject> myObjects;

ExecutorService executor;
List<Future<?>> runnables = new ArrayList<Future<?>>();

public Parent(){
myObjects= new ConcurrentHashMap<String, MyObject>();

executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
runnables.add(future);
}
}

private synchronized String getMessageFromSender(){
// Get a message from the common source
}

private synchronized MyObject getMyObject(String id){
MyObject myObject = myObjects.get(id);
if (myObject == null) {
myObject = new MyObject(id);
myObjects.put(id, myObject);
}
return myObject;
}

private class WorkerThread implements Runnable {
private String name;

public WorkerThread(String name) {
this.name = name;
}

@Override
public void run() {
while(!isStopped()) {
JSONObject message = getMessageFromSender();
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
}
}
}
}
}


So basically I have one producer and N consumers, to speed-up processing, but the N consumers have to deal with a common base of data and chronological order has to be respected.

I am currently using a
ConcurrentHashMap
, but I'm willing to change it if needed.

The code seems to work if messages with same ID arrive enough apart (> 1 second), but if I get two messages with the same ID in the distance of microseconds, I get two threads dealing with the same item in the collection.

I GUESS that my desired behavior is:

Thread 1 Thread 2
--------------------------------------------------------------
read message 1
find ID
lock that ID in collection
do computation and update
read message 2
find ID
lock that ID in collection
do computation and update


While I THINK that this is what happens:

Thread 1 Thread 2
--------------------------------------------------------------
read message 1
read message 2
find ID
lock that ID in collection
do computation and update
find ID
lock that ID in collection
do computation and update


I thought about doing something like

JSONObject message = getMessageFromSender();
synchronized(message){
String id = message.getString("id");
MyObject myObject = getMyObject(id);
synchronized (myObject) {
doLotOfStuff(myObject);
} // well maybe this inner synchronized is superfluous, at this point
}


But I think that would kill the whole purpose of having a multithreaded structure, since I would read one message at a time, and the workers are not doing anything else; and it would be like if I was using a SynchronizedHashMap instead of a ConcurrentHashMap.




For the record, I report here the solution I implemented eventually. I'm not sure it is optimal and I still have to test for performances, but at least the input is handed properly.

public class Parent implements Runnable {

private final static int NUM_WORKERS = 10;
ExecutorService executor;
List<Future<?>> futures = new ArrayList<Future<?>>();
List<WorkerThread> workers = new ArrayList<WorkerThread>();

@Override
public void run() {
executor = Executors.newFixedThreadPool(NUM_WORKERS);
for (int i = 0; i < NUM_WORKERS; i++) {
WorkerThread worker = new WorkerThread("worker_" + i);
Future<?> future = executor.submit(worker);
futures.add(future);
workers.add(worker);
}

while(!isStopped()) {
byte[] message = getMessageFromSender();
byte[] id = getId(message);
int n = Integer.valueOf(Byte.toString(id[id.length-1])) % NUM_WORKERS;
if(n >= 0 && n <= (NUM_WORKERS-1)){
workers.get(n).addToQueue(line);
}
}
}

private class WorkerThread implements Runnable {
private String name;
private Map<String, MyObject> myObjects;
private LinkedBlockingQueue<byte[]> queue;

public WorkerThread(String name) {
this.name = name;
}

public void addToQueue(byte[] line) {
queue.add(line);
}

@Override
public void run() {
while(!isStopped()) {
byte[] message= queue.poll();
if(line != null) {
String id = getId(message);
MyObject myObject = getMyObject(id);
doLotOfStuff(myObject);
}
}
}
}
}

Answer

Conceptually this is kind of routing problem. What you need to is:

Get your your main thread (single thread) reading messages of the queue and push the data to a FIFO queue per id. Get a single thread to consume messages from each queue.

Locking examples will (probably) not work as after the second message order is not guaranteed even if fair=true.

From Javadoc: Even when this lock has been set to use a fair ordering policy, a call to tryLock() will immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock.

One thing for you to decide is if you want to create a a thread per queue (which will exit once the queue is empty) or keep the fixed size thread pool and manage get the extra bits to assign threads to queues.

So, you get a single thread reading from the original queue and writing to the per-id-queues and the you also get one thread per id reading from individual queues. This will ensure task serialization.

In terms of performance, you should see significant speed-up as long as the incoming messages have a nice distribution (id-wise). If you get mostly same-id messages then task will be serialized and also include the overhead for control object creation and synchronization.