hao hao - 5 months ago 14
Java Question

Wait and notify in Consumer and Producer Threads

Just started learning multi-threading. I have 5 producers and 2 consumers in multiple threads. Basically this program adds 100 items into the queue. The producer will stop adding when the queue size is 100. I would like the consumer to notify the producer when the consumer removes all the items from the queue so that the producer can start adding again. Currently the producer will wait but never gets notified by the consumer.

Producer:

public class Producer implements Runnable {

private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();

public Producer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}

public void run() {
while(true) {
if(sharedQueue.size()== queueSize){

try {
synchronized (lock) {
sharedQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

try {
sharedQueue.put("Producer: " + sharedQueue.size());
Thread.sleep(500);
System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


consumer:

public class Consumer implements Runnable{

private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();

public Consumer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
//Notify awaiting thread if the sharedQueue is empty
public void run() {
while (true) {
if(sharedQueue.size()==queueEmpty){
synchronized (lock) {
this.notifyAll();
}
}
try {

sharedQueue.take();
Thread.sleep(800);
System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());

}catch(InterruptedException e){
e.printStackTrace();
}
}

}
}


Main class

public class App{

//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
final int queueSize =100;
final int producerNum = 5;
final int consumerNum = 2;

final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);

for(int i=0;i<producerNum;i++){
Producer producer = new Producer(sharedQueue,queueSize);
executorProducer.execute(producer);
}

for(int j=0;j<consumerNum;j++){
Consumer consumer = new Consumer(sharedQueue,queueSize);
executorConsumer.execute(consumer);
}



}
}

Answer

From oracle documentation page:

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control

Since you are already using BlockingQueues, you can get rid of wait() and notify() APIs.

Example code for multiple producer and consumers using BlockingQueue:

import java.util.concurrent.*;

public class ProducerConsumerDemo {

    public static void main(String args[]){

     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

      Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
      Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
      Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
      Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

      prodThread1.start();
      prodThread2.start();
      consThread1.start();
      consThread2.start(); 
   }

}

class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

How does it work?

  1. Producer Thread 1 puts Integers ranging from 11 - 15into BlockingQueue
  2. Producer Thread 2 puts Integers ranging from 21 - 25 into BlockingQueue
  3. Any of Consumer Threads - Thread 1 or Thread 2 reads values from BlockingQueue (Integer in this example)

Sample output:

Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1