Maximum Zeal ~ Emphatic prose on indulged fascinations

Concurrency Pattern: The Producer Consumer Pattern in Java

Introduction

Recently, after a very long time, I encountered once again the topic of the producer consumer pattern in Java. In our day to day lives as developers we rarely have to implement such a pattern in code. We are usually dealing with producers and consumers in concept form at far higher level of abstractions such as inter-process level over MQ or 29 West or if it must be in-process normal method invocation might suffice. Or it may be that we only have to implement one half, either the producer or consumer, at our end and the other half is written by a downstream system that we are integrating with. I myself can’t remember the last time I had to write a literal in-process producer/consumer implementation for production.

Though one could argue that any message passing through queues or otherwise is a manifestation of this pattern but then if you continue along this route the message passing paradigm or even object orientation could be considered to represent this pattern. Okay I’ll stop here with the introduction as my philosophical side is taking over. But anyway, as I came across this recently I thought it an interesting exercise to explore and document the various ways of implementing such a pattern in Java. Employers also tend to have this as a favourite question to ask.

Problem Statement

The problem explored here is to implement the producer consumer pattern in Java to perform single message exchange without polling or spinning as these can be wasteful of resources. Though if you disagree that these can be wasteful please provide an appropriate performant implementation in the comments!

Solution

In any producer consumer scenario there are three entities – producer, consumer and broker. Though the broker is optional in certain topologies in real life such as 29West multicast – for the purposes of this post the broker is central to exploring the different ways of implementing this pattern. So let’s lay down the prerequisites.

Bootstrap

The bootstrap main method simply chooses a broker implementation, makes it available to the producer and consumer and starts the latter two as threads of their own. The broker is an interface to allow us to substitute in different implementations.

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;
import name.dhruba.kb.concurrency.pc.broker.ConditionWaitBroker;

public class ProducerConsumerBootstrap {

    public static void main(String[] args) {
        Broker<Integer> exchanger = new WaitNotifyBroker<Integer>();
        new Thread(new Producer(exchanger)).start();
        new Thread(new Consumer(exchanger)).start();
    }

}

Running the bootstrap should produce the following output.

produced: 0
consumed: 0
produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
consumed: 4
produced termination signal
received termination signal

Producer

The producer simply produces five integers starting at zero in sequence followed by a termination signal (-1).

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;

class Producer implements Runnable {

    final Broker<Integer> broker;

    Producer(Broker<Integer> exchanger) {
        this.broker = exchanger;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                broker.put(i);
                System.out.format("produced: %s%n", i);
                Thread.sleep(1000);
            }
            broker.put(-1);
            System.out.println("produced termination signal");
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
    }

}

Consumer

The consumer, in turn, receives the five integers and terminates consumption upon receiving the termination signal.

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;

class Consumer implements Runnable {

    final Broker<Integer> broker;

    Consumer(Broker<Integer> broker) {
        this.broker = broker;
    }

    @Override
    public void run() {
        try {
            for (Integer message = broker.take(); message != -1; message = broker.take()) {
                System.out.format("consumed: %s%n", message);
                Thread.sleep(1000);
            }
            System.out.println("received termination signal");
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
    }

}

Broker

The broker is the focus of this post. It is an interface as follows and its implementations are intended to support exchange of a single message at a time for simplicity of illustration.

package name.dhruba.kb.concurrency.pc.broker;

public interface Broker<T> {

    T take() throws InterruptedException;

    void put(T message) throws InterruptedException;

}

Broker implementations

Wait notify broker

A wait notify broker is possibly the simplest solution to the problem but also one at the lowest level of abstraction and also a legacy way. I cannot see any reason to use such primitives in the modern day and age to achieve any purpose. Joshua Bloch might say the same. Nevertheless the implementation would be as follows.

package name.dhruba.kb.concurrency.pc.broker;

public class WaitNotifyBroker<T> implements Broker<T> {

    private T message;
    private boolean empty = true;

    @Override
    public synchronized T take() throws InterruptedException {
        while (empty) {
            wait();
        }
        empty = true;
        notifyAll();
        return message;
    }

    @Override
    public synchronized void put(T message) throws InterruptedException {
        while (!empty) {
            wait();
        }
        empty = false;
        this.message = message;
        notifyAll();
    }

}
Synchronous Queue Broker

Having explored the rather boring wait-notify option we can now move onto more interesting implementations. The next thing that comes to mind is a SynchronousQueue. Why? Because the problem here is to facilitate single message exchange and for this SQ are perfect. Synchronous queues are effectively zero capacity queues and only pass messages across threads when consuming threads are ready to take handover.

    package name.dhruba.kb.concurrency.pc.broker;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class SyncQueueBroker<T> implements Broker<T> {
    
        private final BlockingQueue<T> queue = new SynchronousQueue<T>();
    
        @Override
        public T take() throws InterruptedException {
            return queue.take();
        }
    
        @Override
        public void put(T message) throws InterruptedException {
            queue.put(message);
        }
    
    }

In this example we could have used any blocking queue implementation but I used SQ for lightweight and immediate thread handovers without buffering.

Exchanger Broker

So far we’ve tackled wait-notify and queues but how else can we achieve the same effect. The next solution uses an Exchanger. The Exchanger is a little known but tremendously interesting and powerful utility to swap messages between threads in a really simple way. It’s usage is as follows.

package name.dhruba.kb.concurrency.pc.broker;

import java.util.concurrent.Exchanger;

public class ExchangerBroker<T> implements Broker<T> {

    private T message;
    private final Exchanger<T> exchanger = new Exchanger<T>();

    public void put(T message) throws InterruptedException {
        message = exchanger.exchange(message);
    }

    public T take() throws InterruptedException {
        return exchanger.exchange(message);
    }

}

I have to admit that I’ve never actually needed or found a legitimate use for an exchanger in production code and I’m curious to hear if you have.

Condition Broker

And, finally, we come to our last implementation using Condition which is again, like the Exchanger, little known and rarely seen out there (at least in my experience). I’ve only seen it in production code once and that too was for a very unusual use case. However conditions have a key advantage over their predecessor – wait/notify. To quote the javadoc on this one: “Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods”.

package name.dhruba.kb.concurrency.pc.broker;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBroker<T> implements Broker<T> {

    private T message;
    private boolean empty = true;

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition fullState = lock.newCondition();
    private final Condition emptyState = lock.newCondition();

    @Override
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (empty) {
                fullState.await();
            }
            empty = true;
            emptyState.signal();
            return message;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(T message) throws InterruptedException {
        lock.lock();
        try {
            while (!empty) {
                emptyState.await();
            }
            this.message = message;
            empty = false;
            fullState.signal();
        } finally {
            lock.unlock();
        }
    }

}

As you can see this is a much more effective semantic representation of the solution than the equivalent wait-notify.

Conclusion

The producer/consumer pattern is a useful scenario within which to explore solutions to bounded buffers, blocking queues and in-process eventing. Though it is usually of little use as higher level synchronisation primitives in Java 5 negate the need for us to implement any such thing in our processes. Employers love to ask this question in interviews. And for that bear in mind that it’s useful to know not only the higher level utilities but also the lower level wait notify behaviour even though it’s virtually obsolete at this point. If you can think of any other ways of implementing a single message exchange producer/consumer pattern please comment on here. Thanks.

8 Responses to Concurrency Pattern: The Producer Consumer Pattern in Java

  1. Hi Dhruba,

    Wonderful post but i think in the broker example above , Producer and Consumer thread are sharing the instance boolean variable empty and hence they get stored in Threads local caches . If Producer thread sets empty to false after inserting element on first attempt , it may not be propagated to empty variable . Hence if Consumer gets notified it will call wait() and hence may result in deadlock.

    I think empty should be declared as volatile to avoid such scenarios

  2. Deepak – thanks for stopping by and for your kind words. The visibility of the empty variable is guaranteed by the fact that it is guarded by a lock on this and both take() and put() use the same lock. Volatile won’t do any more for visibility than is already being done. Please see here for an alternative example.

  3. I’m not sure but correct if I’m wrong:

    this code in take() has an issue:
    13 empty = true;
    14 notifyAll();
    15 return message;

    as soon as you set notifyAll() other thread may run between 14 and 15 and replace “message” with new “message” and your take() method will return incorrect “message”

  4. Alex. First of all sorry for the exceptionally late response. Secondly, to address your question, I’m afraid you’re wrong as all methods are synchronized. Any other thread must obtain a lock to do its work and the lock won’t be released until line 15 above (the return statement) has already run. See here for further explanation.

Leave a Reply