Tuesday, August 20, 2019

Producer Consumer Example In Java

Producer Consumer Example In Java
                                 OR
Producer Consumer Problem with Wait and Notify - Thread Example

Example-1: Without BlockingQueue. 

import java.util.Vector;

class Producer implements Runnable {

 private Vector<Integer> queue;
 private int SIZE;

 public Producer(Vector<Integer> queue, int size) {
  this.queue = queue;
  this.SIZE = size;
 }

 @Override
 public void run() {
  for (int i = 0; i < 7; i++) {
   System.out.println("Produced: " + i);
   try {
    produce(i);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

  }
 }

 private void produce(int i) throws InterruptedException {
  synchronized (queue) {
   // wait if queue is full
   while (queue.size() == SIZE) {
    System.out.println("Queue is full " + Thread.currentThread().getName()
      + " is waiting for consumed elements from queue , size: " + queue.size());
    queue.wait();
   }
   // add elements
   queue.add(i);
   // notify consumers
   queue.notifyAll();
  }
 }
}

class Consumer implements Runnable {

 private Vector<Integer> queue;
 private int size;

 public Consumer(Vector<Integer> queue, int size) {
  this.queue = queue;
  this.size = size;
 }

 @Override
 public void run() {
  while (true) {
   try {
    consume();
    Thread.sleep(5000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

  }
 }

 private void consume() throws InterruptedException {
  synchronized (queue) {
   // wait if queue is empty
   while (queue.isEmpty()) {
    System.out.println("Queue is empty " + Thread.currentThread().getName()
      + " is waiting for add elements in queue , size: " + queue.size());
    queue.wait();
   }
   // Otherwise consume element and notify waiting producer
   System.out.println("Consumed: " + (Integer) queue.remove(0));
   queue.notifyAll();
  }
 }
}

public class ProducerConsumerExample {

 public static void main(String args[]) {

  Vector<Integer> queue = new Vector<Integer>();
  int size = 4;

  Thread prodThread = new Thread(new Producer(queue, size), "Producer");
  Thread consThread = new Thread(new Consumer(queue, size), "Consumer");

  prodThread.start();
  consThread.start();
 }
} 
Outpout of above program:-

Produced: 0
Queue is empty Consumer is waiting for add elements in queue , size: 0
Produced: 1
Consumed: 0
Produced: 2
Produced: 3
Produced: 4
Produced: 5
Queue is full Producer is waiting for consumed elements from queue , size: 4
Consumed: 1
Produced: 6
Queue is full Producer is waiting for consumed elements from queue , size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Queue is empty Consumer is waiting for add elements in queue , size: 0
Example-2: Without BlockingQueue.

import java.util.LinkedList;
import java.util.Queue;

class ProducerOne {
 private Queue<Integer> queue;
 private int size;

 public ProducerOne(Queue<Integer> queue, int size) {
  this.queue = queue;
  this.size = size;
 }

 public void produce() throws InterruptedException {
  int value = 0;
  while (true) {
   synchronized (queue) {
    while (queue.size() >= size) {
     // wait if queue is full
     queue.wait();
    }
    queue.add(value);
    System.out.println("Produced " + value);
    value++;
    // notify the consumer
    queue.notifyAll();
    Thread.sleep(1000);
   }
  }
 }
}

class ConsumerOne {
 private Queue<Integer> queue;
 private int size;

 public ConsumerOne(Queue<Integer> queue, int size) {
  this.queue = queue;
  this.size = size;
 }

 public void consume() throws InterruptedException {
  while (true) {
   synchronized (queue) {
    while (queue.size() == 0) {
     // wait if queue is empty
     queue.wait();
    }
    int value = queue.poll();
    System.out.println("Consume " + value);
    // notify the producer
    queue.notifyAll();
    Thread.sleep(1000);
   }
  }
 }
}

public class ProducerConsumerExample2 {

 public static void main(String[] args) throws InterruptedException {

  Queue<Integer> queue = new LinkedList<>();
  int size = 2;
  ProducerOne ProducerOne = new ProducerOne(queue, size);
  ConsumerOne ConsumerOne = new ConsumerOne(queue, size);

  Thread producerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     ProducerOne.produce();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });
  Thread consumerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     ConsumerOne.consume();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  });

  producerThread.start();
  consumerThread.start();
  producerThread.join();
  consumerThread.join();
 }
}

Outpout of above program:-

Produced 0
Consume 0
Produced 1
Produced 2
Consume 1
Consume 2
Produced 3
Produced 4
Consume 3
Consume 4
Produced 5
Produced 6
Consume 5
Consume 6
Produced 7
Produced 8
Consume 7
Consume 8
Produced 9
Produced 10
Consume 9
Consume 10
Produced 11



No comments:

Post a Comment