TheCoder TheCoder - 26 days ago 11
Java Question

Single Producer Multiple Consumer - queue contains null

Can anyone explain how consumer-2 below is consuming a 'null'? My code should be preventing this.

public class Test {

public static void main(String args[]) throws InterruptedException {

BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10);

Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1");
//Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2");
Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1");
Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2");

Collection<Callable<Integer>> callables = new HashSet<>();
callables.add(producer1);
//callables.add(producer2);
callables.add(consumer1);
callables.add(consumer2);

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.invokeAll(callables);
}
}

public class BoundedQueue<T> {

private int capacity;
private int head;
private int tail;
private int currentSizeOfBuffer;
private T[] buffer;

private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

public BoundedQueue(int capacity) {
this.capacity = capacity;
this.buffer = (T[]) new Object[capacity];
}

public void put(T element) throws InterruptedException {

final ReentrantLock lock = this.lock;
lock.lock();

if(isBufferFull()) {
waitOnAvailableSlot();
}

try {
buffer[tail] = element;
tail = getNextAvailableSlot(tail);
currentSizeOfBuffer++;

informConsumerQueueHasElement();

} finally {
lock.unlock();
}
}

private boolean isBufferFull() {
return capacity == currentSizeOfBuffer;
}

private void waitOnAvailableSlot() throws InterruptedException {
notFull.await();
}

private void informConsumerQueueHasElement() {
notEmpty.signal();
}

public T take() throws InterruptedException {

final ReentrantLock lock = this.lock;
lock.lock();

if(isBufferEmpty()) {
waitOnAvailableElement();
}

try {
T element = buffer[head];
head = getNextAvailableSlot(head);
currentSizeOfBuffer--;

informProducerQueueHasSpaceAvailable();

return element;
} finally {
lock.unlock();
}
}

private boolean isBufferEmpty() {
return 0 == currentSizeOfBuffer;
}

private void waitOnAvailableElement() throws InterruptedException {
notEmpty.await();
}

private void informProducerQueueHasSpaceAvailable() {
notFull.signal();
}

private final int getNextAvailableSlot(int currentSlotPosition) {
int nextAvailableSlot = ++currentSlotPosition;
return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot;
}
}


public class Producer implements Callable<Integer> {

private final BoundedQueue sharedQueue;
private String name;

@Override
public Integer call() throws Exception {

for(int i=0; i<10; i++){
try {
sharedQueue.put(i);
System.out.println(name + " produced: " + i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
return null;
}

public Producer(BoundedQueue sharedQueue, String name) {
this.sharedQueue = sharedQueue;
this.name = name;
}
}

public class Consumer implements Callable<Integer> {

private final BoundedQueue sharedQueue;
private String name;

@Override
public Integer call() throws Exception {

while(true){ //what is happening here?
try {
Integer element = (Integer) sharedQueue.take();
System.out.println(name + " consumed: "+ element);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}

public Consumer(BoundedQueue sharedQueue, String name) {
this.sharedQueue = sharedQueue;
this.name = name;
}
}


Output:


  • producer-2 produced: 0

  • consumer-2 consumed: null

  • consumer-1 consumed: 0

  • producer-2 produced: 1

  • producer-2 produced: 2

  • consumer-2 consumed: 2

  • consumer-1 consumed: 0

  • producer-1 produced: 0

  • consumer-2 consumed: 3

  • etc



Another run:


  • producer-2 produced: 0

  • consumer-1 consumed: 0

  • consumer-2 consumed: null

  • producer-1 produced: 0

  • roducer-2 produced: 1

  • producer-1 produced: 1

  • consumer-2 consumed: 0

  • consumer-1 consumed: null

  • consumer-2 consumed: 2

  • etc


Answer

You need to use while(isBufferEmpty()) instead of just if (and the same for full). Since all consumers (and producers) get signaled at the same time, you have to recheck to make sure that the other ones haven't already processed the elements added in the queue.