ASD ASD - 4 months ago 40
Java Question

Producer/Consumer using Semaphore; getting deadlock

According to http://en.wikipedia.org/wiki/Producer-consumer_problem I want to simulate P/C problem using semaphore. I am getting deadlock and I don't know what is problem.

public static void main(String[] args) {
CustomBlockingQueue blockingQueue = new CustomBlockingQueue();
new Thread(new Producer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
}

@SuppressWarnings("serial")
class CustomBlockingQueue extends LinkedList<Object> {
private static final int MAX_SIZE = 10;

private Semaphore mutex = new Semaphore(1);
private Semaphore fillCount = new Semaphore(0);
private Semaphore emptyCount = new Semaphore(MAX_SIZE);

@Override
public boolean offer(Object e) {
try {
mutex.acquire();
} catch (InterruptedException e2) {
e2.printStackTrace();
}
boolean result = super.offer(e);
System.out.println("offer " + size());
try {
fillCount.release();
emptyCount.acquire();
mutex.release();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return result;
}

@Override
public Object poll() {
try {
mutex.acquire();
} catch (InterruptedException e2) {
e2.printStackTrace();
}
Object result = super.poll();
System.out.println("poll " + size());
try {
emptyCount.release();
fillCount.acquire();
mutex.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}

class Producer implements Runnable {
private CustomBlockingQueue blockingQueue;
private Random random = new Random();

public Producer(CustomBlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(random.nextInt(2));
blockingQueue.offer(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Consumer implements Runnable {
private CustomBlockingQueue blockingQueue;
private Random random = new Random();

public Consumer(CustomBlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(random.nextInt(4));
blockingQueue.poll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Using semaphores

Semaphores solve the problem of lost wakeup calls. In the solution below we use two semaphores, fillCount and emptyCount, to solve the problem. fillCount is the number of items to be read in the buffer, and emptyCount is the number of available spaces in the buffer where items could be written. fillCount is incremented and emptyCount decremented when a new item has been put into the buffer. If the producer tries to decrement emptyCount while its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount is incremented and the producer wakes up. The consumer works analogously.

Answer

your locking is in the wrong order:

needs to be for offer:

        emptyCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        fillCount.release();

similar change needed for poll:

        fillCount.acquire();
        mutex.acquire();
        doModification();
        mutex.release();
        emptyCount.release();

in your implementation you are waiting for semaphores while holding the mutex which causes problems because the other thread can be waiting for the mutex in order to release a semaphore.