/** Here we implementing Producers and Consumers communicating through a protected bounded buffer implemented using explicit locks and conditions. It will run until you explictly terminate it with CONTROL-C */ import java.util.concurrent.locks.*; class PBBuffer { private int head; private int tail; private int count; private int[] buffer; private ReentrantLock mutex; private Condition emptyOrFull; public PBBuffer(int size) { buffer = new int[size]; head = 0; tail = 0; count = 0; mutex = new ReentrantLock(); emptyOrFull = mutex.newCondition(); } /** Insert element e in the buffer, wait if already full*/ public void put(int e) { mutex.lock(); try { while (count == buffer.length) { try { emptyOrFull.await(); } catch (InterruptedException x) { System.out.println("Interrupted put"); } } count++; if (head == buffer.length) head = 0; buffer[head] = e; head++; emptyOrFull.signalAll(); } finally { mutex.unlock(); } } /** Remove and return an element from buffer. Wait if empty.*/ public int get() { int e; mutex.lock(); try { while (count == 0) { try { emptyOrFull.await(); } catch (InterruptedException x) { System.out.println("Interrupted get"); } } count--; e = buffer[tail]; tail++; if (tail == buffer.length) tail = 0; emptyOrFull.signalAll(); } finally { mutex.unlock(); } return e; } } class Producer extends Thread { private int seed; //This is the value inserted in buffer by this thread private PBBuffer b; public Producer(PBBuffer b, int seed) { this.seed = seed; this.b = b; } public void run() { while (true) { b.put(seed); yield(); // Let other producers run } } } class Consumer extends Thread { private static int counter; private int self; // Identifier of consumer private PBBuffer b; public Consumer(PBBuffer b) { this.b = b; counter++; this.self = counter; } public void run() { while (true) { int n= b.get(); System.out.println("producer " + n + ", consumer " + self); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Received interrupt"); break; } } } } public class ProducersConsumers1 { public static final int BUFFER_SIZE = 11; public static final int NUMBER_OF_PRODUCERS = 5; public static final int NUMBER_OF_CONSUMERS = 4; public static void main(String[] args) { PBBuffer b = new PBBuffer(BUFFER_SIZE); final int nProducers = NUMBER_OF_PRODUCERS; final int nConsumers = NUMBER_OF_CONSUMERS; Thread[] producers = new Thread[nProducers]; Thread[] consumers = new Thread[nConsumers]; for (int k = 0; k < nConsumers; k++) { consumers[k] = new Consumer(b); consumers[k].start(); } for (int k = 0; k < nProducers; k++) { producers[k] = new Producer(b, k+1); producers[k].start(); } } }