Dmytro Titov Dmytro Titov - 7 months ago 25
Java Question

Wierd Hazelcat IMap#put() behaviour

My Hazelcast-based program can work in two modes: submitter and worker.

Submitter puts some POJO to the distributed map by some key, e.g.:

hazelcastInstance.getMap(MAP_NAME).put(key, value);


Worker has an infinite loop (with
Thread.sleep(1000L);
inside for timeout) which must process entities from map. For now I'm just printing the map size in this loop.

Now here's the problem. I start worker app. Then I start four submitters simultaneously (each adds an entry to the map and terminates it's work). But after all submitter apps are done, the worker app prints arbitrary size: sometimes it detects that only one entry was added, sometimes two, sometimes three (actually it never has seen all four entries).

What is the problem with this simple flow? I've read in Hazelcast docs that
put()
method is synchronous, so it guarantees that after it returns, entry is placed to distributed map and is replicated. But it doesn't seem so in my experiment.

UPD (code)

Submitter:

public void submit(String key) {
Object mySerializableObject = ...
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}


Worker:

public void process() {
while (true) {
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
System.out.println(map.size());

// Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
// objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
try {
Thread.sleep(PAUSE);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}


I commented out "processing" part itself, because now I'm just trying to get consistent state of the map. The code above prints different results each time, e.g.: "4, 3, 1, 1, 1, 1, 1..." (so it can even see 4 submitted tasks for a moment, but then they... disappear).

UPD (log)

Worker:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...


Submitter 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1


Submitter 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4


Submitter 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2


Submitter 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4

Answer

Well, I guess, I've figured out the problem. As far as I understand, distributed IMap returned by hazelcastInstance.getMap doesn't guarantee that data is replicated over all existing nodes in the cluster: some portions of data may be replicated to some nodes, another portion - to another nodes. That's why in my example some of submitted tasks were replicated not to worker node (which works perpetually), but to some other submitters, which terminate their execution after submission. So such entries were lost on submitters exit.

I solved this issue by replacing hazelcastInstance.getMap to hazelcastInstance.getReplicatedMap. This method returns ReplicatedMap, which, AFAIK, guarantees that entries placed into it will be replicated to all nodes of the cluster. So now everything works fine in my system.