What is the Producer-Consumer Problem?
The Producer-Consumer Problem, also known as the Bounded Buffer Problem, entails multiple threads operating as either producers or consumers, sharing a common buffer (or storage).
- The Producer generates an item and adds it to the shared buffer.
- The Consumer retrieves an item from the same buffer for consumption.
Accessing a buffer without synchronization methods can lead to data corruption, race conditions, and undefined behavior. Race conditions occur when multiple threads access shared resources concurrently without proper synchronization, leading to unpredictable outcomes. In the context of buffer access, this means that one thread might be reading from or writing to the buffer while another thread is also attempting to read from or write to it simultaneously. This can result in interleaved or partial data being read or written, leading to corrupted or inconsistent data.
In extreme cases, such as when multiple threads attempt to modify the same data simultaneously, it can lead to a situation where the data becomes completely corrupted or nonsensical. This is commonly referred to as "data corruption" or "data integrity issues."
To avoid such problems, synchronization mechanisms such as locks, semaphores, or mutexes are used to coordinate access to shared resources like buffers among multiple threads. These mechanisms ensure that only one thread can access the buffer at a time, preventing race conditions and maintaining data integrity.
In our case, we will use semaphores to fix this issue.
My Implementation of Producer-Consumer in Java
First of all, I added a few unique things to this problem which is:
Producer generation rate of products controlled by a function that decreases the sleep time of the thread by the increase of the consumer threads count.
Manager thread to control Producer generation of a product by pausing Producer thread when the buffer reaches max size and continuing after the buffer is about to be empty.
Now let's start with coding in Java
Semaphore class
You can skip this and use the Java default Semaphore class
public class Semaphore {
private int count;
public Semaphore(int count) {
this.count = count;
}
public synchronized void acquire() throws InterruptedException {
while (count <= 0) {
wait();
}
count--;
}
public synchronized void release() {
count++;
notify();
}
}
this class will help us to synchronize the thread's access to the critical section which is the buffer.
Stock class
This is the buffer class I named it stock and we will store it the products (just random numbers) generated by the Producer thread in it.
public class Stock {
public static int MAXIMUM_STOCK = 40;
public static int MINIMUM_STOCK = 10;
private final List<Integer> storage = new ArrayList<>();
public void Store(int item) {
storage.add(item);
}
public int Retrieve() {
return storage.removeFirst();
}
public int getSize() {
return storage.size();
}
public boolean isMaximumStock() {
return storage.size() >= MAXIMUM_STOCK;
}
public boolean isMinimumStock() {
return storage.size() <= MINIMUM_STOCK;
}
}
Factory Thread
The factory thread will generate products and it's will be stored in stock.
public class Factory implements Runnable {
private boolean productionStatus = true;
public boolean getProductionStatus() {
return productionStatus;
}
public void setProductionStatus(boolean x) {
this.productionStatus = x;
}
private int Produce() {
return Utils.random.nextInt(10000);
}
@Override
public void run() {
while (true) {
if (this.productionStatus) {
try {
Manager.empty.acquire();
} catch (InterruptedException ignored) {}
try {
Manager.mutex.acquire();
//Critical Section
int product = Produce();
Manager.getStock().Store(product);
Debug.out("Factory Produced a Product (" + product + ").", Debug.Color.GREEN);
Debug.out("stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);
Manager.mutex.release();
Manager.full.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
Thread.sleep(TimingCalculator(Client.getCount()));
} catch (InterruptedException ignored) {
}
}
}
}
}
and don't forget the util class that has the function that controls the producer generation rate.
public class Utils {
public static Random random = new Random();
public static int TimingCalculator(int input) {
int initialMaxTime = 500;
int minimalTime = 25;
double factor = 1.5f;
return Math.max((int) (initialMaxTime / Math.pow(factor, input - 1)), minimalTime);
}
}
Client Thread
The Client threads will consume the product from the stock.
public class Client implements Runnable {
private static int count = 0;
private final int id;
public Client() {
this.id = ++count;
}
public static int getCount() {
return Client.count;
}
@Override
public void run() {
while (true) {
try {
Manager.full.acquire();
Manager.mutex.acquire();
//Critical Section
Debug.out("Client-" + id + " consumed a Product (" + Manager.getStock().Retrieve() + ").", Debug.Color.RED);
Debug.out("Stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);
Manager.mutex.release();
Manager.empty.release();
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Now for the most important thread in my implementation
Manager Thread
This thread init the semaphores for the synchronization and also stock and factory and will monitor the producer thread status and stock, when the buffer is full it will pause the production and when the buffer is about to be empty it will resume the production.
public class Manager implements Runnable {
public static Semaphore mutex = new Semaphore(1);
public static Semaphore empty = new Semaphore(Stock.MAXIMUM_STOCK);
public static Semaphore full = new Semaphore(0);
private static final Stock stock = new Stock();
private static final Factory factory = new Factory();
private static Thread factoryThread;
public Manager() {
Manager.factoryThread = new Thread(factory);
}
public static Stock getStock() {
return stock;
}
public static Thread getFactoryThread() {
return factoryThread;
}
@Override
public void run() {
while (true) {
if (factory.getProductionStatus()) {
if (stock.isMaximumStock()) {
factoryThread.interrupt();
factory.setProductionStatus(false);
Debug.out("Factory Paused⏸️ Production", Debug.Color.CYAN);
}
} else if (stock.isMinimumStock()) {
factory.setProductionStatus(true);
factoryThread = (new Thread(factory));
factoryThread.start();
Debug.out("Factory Resumed▶️ Production", Debug.Color.CYAN);
}
try {
Thread.sleep(0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Finally thing is set
Main Class
After you implement every class and thread when executing the main function.
public class Main {
public static Scanner read = new Scanner(System.in);
public static void main(String[] args) throws InterruptedException {
//Create a Manager object for maintaining
Manager manager = new Manager();
//Get the number of clients
Debug.out("Enter number of clients: ");
int numClients = read.nextInt();
//Display Speed/Wait Time of the Factory
Debug.out("Factory Timing: " + TimingCalculator(numClients) + "ms", Debug.Color.CYAN);
//Create client objects and threads
Client[] clients = new Client[numClients];
Thread[] clientThreads = new Thread[numClients];
for (int i = 0; i < numClients; i++) {
clients[i] = new Client();
clientThreads[i] = new Thread(clients[i]);
}
//Get Factory thread from Manager and create Manager thread
Thread factoryThread = Manager.getFactoryThread();
Thread managerThread = new Thread(manager);
//Start Threads
managerThread.start();
factoryThread.start();
for (Thread clientThread : clientThreads) {
clientThread.start();
}
//wait for Threads to complete
managerThread.join();
factoryThread.join();
for (Thread clientThread : clientThreads) {
clientThread.join();
}
}
}
The output should be like this:
Enter number of clients:
5
Factory Timing: 98ms
Factory Produced a Product (5810).
stock: 1
Client-1 consumed a Product (5810).
Stock: 0
Factory Produced a Product (1132).
When the stock fills up:
Factory Produced a Product (7226).
stock: 39
Factory Produced a Product (5621).
stock: 40
Factory Paused⏸️ Production
Client-1 consumed a Product (9236).
Stock: 39
Client-3 consumed a Product (8003).
Stock: 38
Client-2 consumed a Product (4937).
Stock: 37
When the stock is about to run out:
Factory Produced a Product (7226).
stock: 39
Factory Produced a Product (5621).
stock: 40
Factory Paused⏸️ Production
Client-1 consumed a Product (9236).
Stock: 39
Client-3 consumed a Product (8003).
Stock: 38
Client-2 consumed a Product (4937).
Stock: 37
Full Java implementation is here on my repo Producer-Consumer-Java