profile

Islam Zaoui

Full Stack Web Developer

Back

Producer Consumer Problem in java

How to solve the Producer-Consumer Problem in java using semaphores and manager 10 months ago

What is the Producer-Consumer Problem?

The Producer-Consumer Problem, also known as the Bounded Buffer Problem, entails multiple threads operating as either producers or consumers, sharing a common buffer (or storage).

  1. The Producer generates an item and adds it to the shared buffer.
  2. The Consumer retrieves an item from the same buffer for consumption.

Accessing a buffer without synchronization methods can lead to data corruption, race conditions, and undefined behavior. Race conditions occur when multiple threads access shared resources concurrently without proper synchronization, leading to unpredictable outcomes. In the context of buffer access, this means that one thread might be reading from or writing to the buffer while another thread is also attempting to read from or write to it simultaneously. This can result in interleaved or partial data being read or written, leading to corrupted or inconsistent data.

In extreme cases, such as when multiple threads attempt to modify the same data simultaneously, it can lead to a situation where the data becomes completely corrupted or nonsensical. This is commonly referred to as "data corruption" or "data integrity issues."

To avoid such problems, synchronization mechanisms such as locks, semaphores, or mutexes are used to coordinate access to shared resources like buffers among multiple threads. These mechanisms ensure that only one thread can access the buffer at a time, preventing race conditions and maintaining data integrity.

In our case, we will use semaphores to fix this issue.

My Implementation of Producer-Consumer in Java

First of all, I added a few unique things to this problem which is:

  1. Producer generation rate of products controlled by a function that decreases the sleep time of the thread by the increase of the consumer threads count.

  2. Manager thread to control Producer generation of a product by pausing Producer thread when the buffer reaches max size and continuing after the buffer is about to be empty.

Now let's start with coding in Java

Semaphore class

You can skip this and use the Java default Semaphore class

java
public class Semaphore {
    private int count;

    public Semaphore(int count) {
        this.count = count;
    }

    public synchronized void acquire() throws InterruptedException {
        while (count <= 0) {
            wait();
        }
        count--;
    }

    public synchronized void release() {
        count++;
        notify();
    }
}

this class will help us to synchronize the thread's access to the critical section which is the buffer.

Stock class

This is the buffer class I named it stock and we will store it the products (just random numbers) generated by the Producer thread in it.

java
public class Stock {
    public static int MAXIMUM_STOCK = 40;
    public static int MINIMUM_STOCK = 10;
    private final List<Integer> storage = new ArrayList<>();

    public void Store(int item) {
        storage.add(item);
    }

    public int Retrieve() {
        return storage.removeFirst();
    }

    public int getSize() {
        return storage.size();
    }

    public boolean isMaximumStock() {
        return storage.size() >= MAXIMUM_STOCK;
    }

    public boolean isMinimumStock() {
        return storage.size() <= MINIMUM_STOCK;
    }
}

Factory Thread

The factory thread will generate products and it's will be stored in stock.

java
public class Factory implements Runnable {
    private boolean productionStatus = true;

    public boolean getProductionStatus() {
        return productionStatus;
    }

    public void setProductionStatus(boolean x) {
        this.productionStatus = x;
    }

    private int Produce() {
        return Utils.random.nextInt(10000);
    }

    @Override
    public void run() {
        while (true) {
            if (this.productionStatus) {
                try {
                    Manager.empty.acquire();
                } catch (InterruptedException ignored) {}

                try {
                    Manager.mutex.acquire();

                    //Critical Section
                    int product = Produce();
                    Manager.getStock().Store(product);

                    Debug.out("Factory Produced a Product (" + product + ").", Debug.Color.GREEN);
                    Debug.out("stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);

                    Manager.mutex.release();
                    Manager.full.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                try {
                    Thread.sleep(TimingCalculator(Client.getCount()));
                } catch (InterruptedException ignored) {
                }

            }
        }
    }
}

and don't forget the util class that has the function that controls the producer generation rate.

java
public class Utils {
    public static Random random = new Random();

    public static int TimingCalculator(int input) {
        int initialMaxTime = 500;
        int minimalTime = 25;
        double factor = 1.5f;
        return Math.max((int) (initialMaxTime / Math.pow(factor, input - 1)), minimalTime);
    }
}

Client Thread

The Client threads will consume the product from the stock.

java
public class Client implements Runnable {
    private static int count = 0;
    private final int id;

    public Client() {
        this.id = ++count;
    }

    public static int getCount() {
        return Client.count;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Manager.full.acquire();
                Manager.mutex.acquire();

                //Critical Section
                Debug.out("Client-" + id + " consumed a Product (" + Manager.getStock().Retrieve() + ").", Debug.Color.RED);
                Debug.out("Stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);

                Manager.mutex.release();
                Manager.empty.release();

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Now for the most important thread in my implementation

Manager Thread

This thread init the semaphores for the synchronization and also stock and factory and will monitor the producer thread status and stock, when the buffer is full it will pause the production and when the buffer is about to be empty it will resume the production.

java
public class Manager implements Runnable {
    public static Semaphore mutex = new Semaphore(1);
    public static Semaphore empty = new Semaphore(Stock.MAXIMUM_STOCK);
    public static Semaphore full = new Semaphore(0);

    private static final Stock stock = new Stock();
    private static final Factory factory = new Factory();
    private static Thread factoryThread;

    public Manager() {
        Manager.factoryThread = new Thread(factory);
    }

    public static Stock getStock() {
        return stock;
    }

    public static Thread getFactoryThread() {
        return factoryThread;
    }

    @Override
    public void run() {
        while (true) {
            if (factory.getProductionStatus()) {
                if (stock.isMaximumStock()) {
                    factoryThread.interrupt();
                    factory.setProductionStatus(false);
                    Debug.out("Factory Paused⏸️ Production", Debug.Color.CYAN);
                }
            } else if (stock.isMinimumStock()) {
                factory.setProductionStatus(true);
                factoryThread = (new Thread(factory));
                factoryThread.start();
                Debug.out("Factory Resumed▶️ Production", Debug.Color.CYAN);
            }


            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Finally thing is set

Main Class

After you implement every class and thread when executing the main function.

java
public class Main {
    public static Scanner read = new Scanner(System.in);

    public static void main(String[] args) throws InterruptedException {
        //Create a Manager object for maintaining
        Manager manager = new Manager();

        //Get the number of clients
        Debug.out("Enter number of clients: ");
        int numClients = read.nextInt();

        //Display Speed/Wait Time of the Factory
        Debug.out("Factory Timing: " + TimingCalculator(numClients) + "ms", Debug.Color.CYAN);

        //Create client objects and threads
        Client[] clients = new Client[numClients];
        Thread[] clientThreads = new Thread[numClients];
        for (int i = 0; i < numClients; i++) {
            clients[i] = new Client();
            clientThreads[i] = new Thread(clients[i]);
        }

        //Get Factory thread from Manager and create Manager thread
        Thread factoryThread = Manager.getFactoryThread();
        Thread managerThread = new Thread(manager);

        //Start Threads
        managerThread.start();
        factoryThread.start();
        for (Thread clientThread : clientThreads) {
            clientThread.start();
        }

        //wait for Threads to complete
        managerThread.join();
        factoryThread.join();
        for (Thread clientThread : clientThreads) {
            clientThread.join();
        }
    }
}

The output should be like this:

bash
Enter number of clients:
5
Factory Timing: 98ms
Factory Produced a Product (5810).
stock: 1
Client-1 consumed a Product (5810).
Stock: 0
Factory Produced a Product (1132).

When the stock fills up:

bash
Factory Produced a Product (7226).
stock: 39
Factory Produced a Product (5621).
stock: 40
Factory Paused⏸️ Production
Client-1 consumed a Product (9236).
Stock: 39
Client-3 consumed a Product (8003).
Stock: 38
Client-2 consumed a Product (4937).
Stock: 37

When the stock is about to run out:

bash
Factory Produced a Product (7226).
stock: 39
Factory Produced a Product (5621).
stock: 40
Factory Paused⏸️ Production
Client-1 consumed a Product (9236).
Stock: 39
Client-3 consumed a Product (8003).
Stock: 38
Client-2 consumed a Product (4937).
Stock: 37

Full Java implementation is here on my repo Producer-Consumer-Java