Tag Archives: concurrency

OpenCL Cookbook: How to leverage multiple devices in OpenCL

So far, in the OpenCL Cookbook series, we’ve only looked at utilising a single device for computation. But what happens when you install more than one card in your host machine? How do you scale your computation across multiple GPUs? Will your code automatically scale to multiple devices or does it require you to consciously think about how to distribute the load of the computation across all available devices and change your code to apply that strategy? Here I look at answers to these questions.

Decide on how you want to use the host binding to support multiple devices

There are two ways in which a given host binding can support multiple devices.

  • A single context across all device and one command queue per device.
  • One context and command queue per device

Let’s look at these in more detail with skeletal implementations in C.

Creating a single context across all devices and one command queue per device

For this particular way of the binding supporting multiple devices we create only one context and share it across one command queue per device. So if we have say two devices we’ll have one context and two command queues each of which share that one context.

#include <iostream>
#include <CL/cl.hpp>
#include <CL/opencl.h>

int main () {

    cl_int err;
    
    // get first platform
    cl_platform_id platform;
    err = clGetPlatformIDs(1, &platform, NULL);
    
    // get device count
    cl_uint deviceCount;
    err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 0, NULL, &deviceCount);
    
    // get all devices
    cl_device_id* devices;
    devices = new cl_device_id[deviceCount];
    err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, deviceCount, devices, NULL);
    
    // create a single context for all devices
    cl_context context = clCreateContext(NULL, deviceCount, devices, NULL, NULL, &err);
    
    // for each device create a separate queue
    cl_command_queue* queues = new cl_command_queue[deviceCount];
    for (int i = 0; i < deviceCount; i++) {
        queues[i] = clCreateCommandQueue(context, devices[i], 0, &err);
    }

    /*
     * Here you have one context across all devices and one command queue per device.
     * You can choose to send your tasks to any of these queues depending on which
     * device you want to execute the task on.
     */

    // cleanup
    for(int i = 0; i < deviceCount; i++) {
        clReleaseDevice(devices[i]);
        clReleaseCommandQueue(queues[i]);
    }
    
    clReleaseContext(context);

    delete[] devices;
    delete[] queues;
    
    return 0;
    
}

Creating one context and one command queue per device

Here I create one context and one command queue per device each of which have their own context rather than sharing one.

#include <iostream>
#include <CL/cl.hpp>
#include <CL/opencl.h>

int main () {

    cl_int err;
    
    // get first platform
    cl_platform_id platform;
    err = clGetPlatformIDs(1, &platform, NULL);
    
    // get device count
    cl_uint deviceCount;
    err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 0, NULL, &deviceCount);
    
    // get all devices
    cl_device_id* devices;
    devices = new cl_device_id[deviceCount];
    err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, deviceCount, devices, NULL);
    
    // for each device create a separate context AND queue
    cl_context* contexts = new cl_context[deviceCount];
    cl_command_queue* queues = new cl_command_queue[deviceCount];
    for (int i = 0; i < deviceCount; i++) {
        contexts[i] = clCreateContext(NULL, deviceCount, devices, NULL, NULL, &err);
        queues[i] = clCreateCommandQueue(contexts[i], devices[i], 0, &err);
    }

    /*
     * Here you have one context and one command queue per device.
     * You can choose to send your tasks to any of these queues.
     */

    // cleanup
    for(int i = 0; i < deviceCount; i++) {
        clReleaseDevice(devices[i]);
        clReleaseContext(contexts[i]);
        clReleaseCommandQueue(queues[i]);
    }
    
    delete[] devices;
    delete[] contexts;
    delete[] queues;
    
    return 0;

}

How do you scale your computation across multiple devices?

The process of utilising multiple devices for your computation is not done automatically by the binding when new devices are detected sadly. Nor is it possible for it do so. Doing this requires active thought from the host programmer. When using a single device you send all your kernel invocations to the command queue associated with that device. In order to use multiple devices you must have one command queue per device either sharing a context or each queue having its own context. Then you must decide how to distribute your kernel calls across all available queues. It may be as simple as a round robin strategy across all queues for all your computations or it may be more complex.

Bear in mind that if your computation entails reading back a result synchronously then a round robin strategy across queues won’t work. This is because each current call will block and complete prior to you sending to the next queue which will essentially make the process of distributing across queues serial. Obviously this defeats the whole purpose of having multiple devices operating in parallel. What you really need is one host thread per device each sending computations to its own command queue. That way each queue is receiving and processing computations in parallel with other queues. Then you effectively achieve true hardware parallelism.

Which of the two ways should you use?

It depends. I would try the single context option first as it’s likely to use less memory and be faster. If you encounter instability or problems I would switch to the multiple context method. That’s the general rule. There is, however, another reason you may opt for a multiple context method. If you are using multiple threads which all require access to a context it is preferable for each thread to have its own context as the opencl host binding is not guaranteed to be thread safe. If you try to access a single context across multiple threads you may get serious system crashes and reboots so always have thread confined opencl structures.

Using a single context across multiple host threads

You may want to use one thread per device to send tasks to the command queue associated with each device. In this case you will have multiple host threads. But here have to be careful. In my experience it has not been safe to use a single context across multiple host threads. The last time I tried this was in C# using the Cloo host binding. Using a single context across multiple host threads resulted in a Windows 7 blue screen, Windows dumping memory to a file and then rebooting after which Windows failed to come back up until physically rebooted once more from the machine. The solution is to use the multi context option outlined above. Have thread confined separation for opencl resources and you’ll be fine.

Concurrent producer consumer pattern using C# 4.0, BlockingCollection & Tasks

Here is a very simple illustrative and annotated producer consumer pattern example using C#, .NET 4.0, BlockingCollection and Tasks. The example sets up one producer thread on which it produces 100 integers and two consumer threads which each read that data off a common concurrent blocking queue. Although I use a BlockingCollection here it is backed internally by a concurrent queue.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace test
{

    class Program
    {

        static void Main(string[] args)
        {

            Console.WriteLine();

            // declare blocking collection backed by concurrent queue
            BlockingCollection<int> b = new BlockingCollection<int>();

            // start consumer 1 which waits for data until available
            var consumer1 = Task.Factory.StartNew(() =>
            {
                foreach (int data in b.GetConsumingEnumerable())
                {
                    Console.Write("c1=" + data + ", ");
                }
            });

            // start consumer 2 which waits for data until available
            var consumer2 = Task.Factory.StartNew(() =>
            {
                foreach (int data in b.GetConsumingEnumerable())
                {
                    Console.Write("c2=" + data + ", ");
                }
            });

            // produce 100 integers on a producer thread
            var producer = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    b.Add(i);
                }
                b.CompleteAdding();
            });

            // wait for producer to finish producing
            producer.Wait();
            
            // wait for all consumers to finish consuming
            Task.WaitAll(consumer1, consumer2);

            Console.WriteLine();
            Console.WriteLine();

        }

    }

}

The output from running the program above is as follows. It’s basically consumer 1 and 2 interleaving as they read integers off the queue as they’re being produced.

c1=0, c1=2, c1=3, c1=4, c1=5, c1=6, c1=7, c1=8, c1=9, c1=10, c1=11, c1=12, c1=13, c1=14, c1=15, c1=16, c1=17, c1=18, c1=19, c2=1, c2=21, c2=22, c2=23, c2=24, c2
=25, c2=26, c2=27, c2=28, c2=29, c2=30, c2=31, c2=32, c2=33, c2=34, c2=35, c2=36, c2=37, c2=38, c2=39, c2=40, c2=41, c2=42, c2=43, c2=44, c2=45, c2=46, c2=47, c
2=48, c2=49, c2=50, c2=51, c2=52, c2=53, c2=54, c2=55, c2=56, c2=57, c2=58, c2=59, c2=60, c2=61, c2=62, c2=63, c2=64, c2=65, c2=66, c2=67, c2=68, c2=69, c2=70,
c2=71, c2=72, c2=73, c2=74, c2=75, c2=76, c2=77, c2=78, c2=79, c2=80, c2=81, c2=82, c2=83, c2=84, c2=85, c2=86, c2=87, c2=88, c2=89, c2=90, c2=91, c2=92, c2=93,
 c2=94, c2=95, c2=96, c2=97, c2=98, c2=99, c1=20,

It’s somewhat sad and disheartening to see just how much further ahead C# is of Java in terms of the expressiveness and richness of the language. I personally really enjoyed using the lambdas above and I hope that with Java 8 the SDK libraries will grow to develop a more fluid and expressive style.

Credit: [1, 2]

Oracle and AMD propose GPU support in Java

A very exciting development indeed has come to my attention albeit four days late. This news is, without exaggeration, in my humble opinion nothing short of groundbreaking, not only because it pushes the boundaries and capabilities of Java even further in terms of hardware support and performance but also because, as I’m gradually beginning to realise, the future of high performance computing is in the GPU.

John Coomes (OpenJDK HotSpot Group Lead) and Gary Frost (AMD) have proposed a new OpenJDK project to implement GPU support in Java with a native JVM (full thread).

This project intends to enable Java applications to seamlessly take advantage of a GPU–whether it is a discrete device or integrated with a CPU–with the objective to improve the application performance.

Their focus will be on code generation, garbage collection and runtimes.

We propose to use the Hotspot JVM, and will concentrate on code generation, garbage collection, and
runtimes. Performance will be improved, while preserving compile time, memory consumption and code generation quality.

The project will also use the new Java 8 lambda language and may eventually sprout further platform enhancements.

We will start exploring leveraging the new Java 8 Lambda language and library features. As this project progress, we may identify challenges with the Java API and constructs which may lead to new language, JVM and library extensions that will need standardization under the JCP process.

John Coomes (Oracle) will lead the project and Gary Frost (AMD) has gladly offered resources from AMD. As the mail thread got underway two already existing related projects were brought to the list’s attention: rootbeer (PDF, slashdot) and Aparapi (AMD page). Rootbeer is a particularly interesting project as it performs static analysis of Java code and generates CUDA code automatically – quite different from compile time OpenCL bindings from Java. The developer of rootbeer has also shown interest in joining the openjdk project. John Rose, Oracle HotSpot developer, also posted a talk he recently gave on Arrays 2.0 which I’m yet to watch.

The missing link in deriving value from GPUs, from what I understand in a domain that I’m still fairly new to, is that GPU hardware and programming need to be made accessible to the mainstream and that’s the purpose I hope this project will serve. I hope also that, once underway, it also raises wider awareness of how we must make a mental shift from concurrency to parallelism and from data parallelism to task parallelism and how the next generational leap in parallelism will come, not from cpu cores or concurrency frameworks, but from harnessing the mighty GPU.

Hacker news has some discussion. What do you think? Let me know.

JDK8: StampedLock: A possible SequenceLock replacement

Quite some time back I wrote about SequenceLock – a new kind of lock, due for release, in JDK8 where each lock acquisition or release advanced a sequence number. Two days ago, however, Doug Lea has reported that, for a variety of reasons, this API may be “less useful than anticipated”. Instead he has proposed a replacement API called StampedLock and requested feedback on it.

Here I reproduce example usage of both APIs for for easy comparison by the reader.

SequenceLock sample usage

 class Point {
   private volatile double x, y;
   private final SequenceLock sl = new SequenceLock();

   // A read-only method
   double distanceFromOriginV1() {
     double currentX, currentY;
     long seq;
     do {
       seq = sl.awaitAvailability();
       currentX = x;
       currentY = y;
     } while (sl.getSequence() != seq); // retry if sequence changed
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

   // an exclusively locked method
   void move(double deltaX, double deltaY) {
     sl.lock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlock();
     }
   }

   // Uses bounded retries before locking
   double distanceFromOriginV2() {
     double currentX, currentY;
     long seq;
     int retries = RETRIES_BEFORE_LOCKING; // for example 8
     try {
       do {
         if (--retries < 0)
           sl.lock();
         seq = sl.awaitAvailability();
         currentX = x;
         currentY = y;
       } while (sl.getSequence() != seq);
     } finally {
       if (retries < 0)
         sl.unlock();
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }
 }

StampedLock sample usage

class Point {
    private int x, y;
    private final StampedLock lock = new StampedLock();

     public int magnitude() { // a read-only method
         long stamp = lock.beginObserving();
         try {
             int currentX = x;
             int currentY = y;
         } finally {
             if (!lock.validate(stamp)) {
                 stamp = lock.lockForReading();
                 try {
                     currentX = x;
                     currentY = y;
                 } finally {
                     lock.unlock(stamp);
                 }
             }
             return currentX * currentX + currentY * currentY;
         }
     }

     public void move(int deltaX, int deltaY) { // a write-locked method
        long stamp = lock.lockForWriting();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            lock.unlock(stamp);
        }
    }
}

There are two primary differences that I can see. First of all – in SequenceLock the read only method has an element of indefinite retry without lock acquisition. In StampedLock, however, the retry element is replaced with lock acquisition which would perform better under a lot of writes. Secondly, the single undifferentiated lock in SequenceLock is replaced with a differentiated read and write lock. The latter feature makes this class another alternative to the existing class: ReentrantReadWriteLock that Doug Lea describes as “cheaper but more restricted”. It will be fascinating to watch the progression of this API over time.

JDK8 ConcurrentHashMap gets huge map support

Doug Lea announces huge map support in the latest incarnation of ConcurrentHashMap to more effectively support more than a billion elements.

Finally acting on an old idea, I committed an update to ConcurrentHashMap (currently only the one in our jdk8 preview package, as jsr166e.ConcurrentHashMap8) that much more gracefully handles maps that are huge or have many keys with colliding hash codes. Internally, it uses tree-map-like structures to maintain bins containing more nodes than would be expected under ideal random key distributions over ideal numbers of bins.

It’s nice to see the huge map consideration get some real effort put into it. Mark Reinhold, I believe, talked about huge map 64-bit support as a possible feature in JDK8 onwards in one of the videos hosted by Adam Messinger. I don’t think Doug Lea’s efforts are related to that but rather his own ideas.

jsr166e: Upcoming java.util.concurrent APIs for Java 8

Jsr166e is to Java8 as Jsr166y was to Java7. Jsr166y introduced the fork join framework and Phaser to Java7 which are worthy of blog posts of their own. The fork join framework will enable us to introduce fine grained inversion of concurrency whereby we can code logic without really needing to think about or implement how that logic will perform on arbitrary hardware.

Now that Java 7 has been released jsr166e has emerged as a repository of utilities that are intended for inclusion into Java 8 next year. Having followed the discussion on the concurrency mailing list I’ve become absolutely fascinated by the work going on in jsr166e for the simple reason that it is catering for use cases that have been directly relevant to my recent work. So without further delay here is an abridged guide to the current state of jsr166e.

Collections

  • ConcurrentHashMapV8: A candidate replacement for java.util.concurrent.ConcurrentHashMap with lower memory footprint. The exact improvements of this over the old implementation I’m yet to explore. Here’s the mail thread announcement and discussion.
  • ConcurrentHashMapV8.MappingFunction: A (well overdue) mechanism for automatically computing a value for a key that doesn’t already have one. I’ve waited a long time for this and in my opinion this is the most basic requirement of a concurrent map as without this you always end up locking to create a new mapping.
  • LongAdderTable: A concurrent counting map where a key is associated with an efficient concurrent primitive counter. This provides significantly improved performance over AtomicLong under high contention as it utilises striping across multiple values. I’ve desperately needed this in my job and I am overjoyed that this has finally been written by the experts group. I’ve been exploring and coding up various implementations of my own of such a class recently but I’d rather have this provided by the JDK. Again a very basic requirement and a class that’s well overdue.
  • ReadMostlyVector: Same as Vector but with reduced contention and better throughput for concurrent reads. I’m a little surprised about this one. Does anyone even use Vector anymore? Why not just replace the underlying implementations of Hashtable and Vector with more performant ones? Is there any backward compatibility constraint that’s restricting this?

Adders

The following adders are essentially high performance concurrent primitive counters that dynamically adapt to growing contention to reduce it. The key value add here is achieved by utilising striping across values on writes and acting across the stripes for read.

Again, high performance primitive counters, are something I’ve desperately needed in my work lately. Imagine if you are implementing client server protocols. You may need message sequence numbers to ensure you can discard out of order/older messages. You might also need request response id correlation for which id generation is necessary. For any such id generation I wanted to use primitive longs for efficiency and as a result needed a high performance primitive long counter and now I have one!

Important: It’s important to note one limitation of these counting APIs. There are no compound methods like incrementAndGet() or addAndGet() which significantly reduces the utility of such API. I can see why this is the case: although the writes can stripe across values the read must act across all striped values and as a result is quite expensive. I therefore need to think about how much this will compromise the use of this API for the use case of an efficient id generator.

  • DoubleAdder: A high performance concurrent primitive double counter.
  • LongAdder: A high performance concurrent primitive long counter.

MaxUpdaters

The following exhibit similar performance characteristics to the adders above but instead of maintaining a count or sum they maintain a maximum value. These also use striped values for writes and reading across striped values to compute aggregate values.

  • DoubleMaxUpdater: A high performance primitive double maximum value maintainer.
  • LongMaxUpdater: A high performance primitive long maximum value maintainer.

Synchronizers

  • SequenceLock: Finally, jsr166e adds an additional synchronisation utility. This is an interesting class which took me two or three reviews of the javadoc example to understand its value add. Essentially it offers the ability to conduct a more accommodating conversation between you and the lock provider whereby you can not only choose not to lock and still retain consistent visibility but also fundamentally allow you to detect when other threads have been active simultaneously with your logic thereby allowing you to retry your behaviour until your read of any state is completely consistent at that moment in time. I can see what value this adds and how to use it but I need to think about real world use cases for this utility.

What is still missing?

Sadly, despite the above, Java shows no signs of addressing a number of other real world use cases of mine.

  • Concurrent primitive key maps
  • Concurrent primitive value maps
  • Concurrent primitive key value maps
  • Externalised (inverted) striping utilities that allow you to hash an incoming key to a particular lock across a distribution of locks. This means that you no longer have to lock entire collections but just the lock relevant to the input you are working with. This is absolutely fundamental and essential in my opinion and has already been written by EhCache for their own use but this should ideally be provided as a building block by the JDK.
  • There’s also been a lot of talk about core-striping as opposed to lock striping which I suppose is an interesting need. In other words instead of the distribution of contention being across lock instances they are across representations (IDs) of physical processor cores. Check the mailing list for details.

Summary

I’m very excited indeed by the incorporations of jsr166e not only because they have directly addressed a number of my real world use cases but also because they give an early peek at what’s to come in Java 8. The additional support for primitives is welcome as they will eliminate reliance on the ghastly autoboxing and gc churn of primitive wrappers. I’ll certainly be using these utilities for my own purposes. Keep up the great work! However, I’d love to hear why the above use cases under ‘What’s missing’ still haven’t seen any activity in Java.

Concurrency pattern: Concurrent multimaps in Java

Preamble

Maintaining state is one of the most important ways in which we can increase the performance of our code. I don’t mean using a database or some goliath system somewhere. I mean local memory caches. Often they can be more than adequate to allow us to maintain state and provide a dramatic improvement in performance.

Before I start with the content of this post, let me just state the obvious or at least what should be obvious if it isn’t already to you, which is that the whole domain of caching is an incredibly difficult and inexhaustible area of study and work which is why dedicated distributed cache providers have sprung up all over the place that companies normally resort to in favour of in-memory caches.

However there is often a combination of in-memory and distributed caches in use and this post focuses on one aspect of in-memory caches – concurrent multimaps in Java and this post is the resource that I wish I had when I was tackling this problem numerous times in the past. The post focuses exclusively on copy on write multimap implementations as that allows the read operation to be lock free which can be a significant advantage depending on what you want to do on read.

Singular value caches

When an in-memory cache is desired one always resorts to some kind of a map structure. And if you’re storing singular key value pairs then creating a cache can be as easy as picking a map implementation though you still have the check-then-act operation of checking whether a value exists and if so returning it otherwise populating it and returning it which can result in some blocking. Nonetheless these problems have already been solved a thousand times over out there now.

For example, Google Guava MapMaker, provides an excellent implementation of the memoization pattern for a cache as follows which is probably the most complex case of a simple singular key value pair cache.

package name.dhruba.kb.concurrency.mapmaker;

import java.util.concurrent.ConcurrentMap;

import com.google.common.base.Function;
import com.google.common.collect.MapMaker;

public class CacheWithExpensiveValues<K, V> {

    private final ConcurrentMap<K, V> cache = new MapMaker().makeComputingMap(new Function<K, V>() {
        @Override
        public V apply(K input) {
            return acquireExpensiveCacheValue();
        }
    });

    public V get(K key) { return cache.get(key); }
    private V acquireExpensiveCacheValue() { return null; }

}

This implementation, in concept similar to the memoization pattern put forward by Brian Goetz in Java Concurrency In Practice, guarantees that a value for a given key is only acquired/resolved once in total during the lifetime of the cache in the event that it hasn’t already been computed which can be very useful if creating/computing the value is an expensive call. Threads which request an uncomputed value while it is being computed wait until the computation already in progress is finished.

This can be said to be strongly consistent in its guarantees. If you are willing to compromise on the guarantees that a cache makes, making it a weakly consistent cache, you may be able to achieve faster performance in some cases by relying solely on atomic CAS operations.

Multi-value caches

A standard singular key value pair cache is fairly straightforward these days. But what happens if you suddenly realise that you actually need multiple values per key? There’s a little bit more to it than first meets the eye. If you are well informed about what’s out there – as a knee jerk reaction, you might immediately think of Google Guava multimaps and create something similar to the example below.

package name.dhruba.kb.concurrency.guava;

import java.util.List;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;

public class MultiMapCacheExample<K, V> {

    private final ListMultimap<K, V> cache = Multimaps.synchronizedListMultimap(ArrayListMultimap
            .<K, V> create());

    public List<V> get(K k) {
        return cache.get(k);
    }

    public List<V> remove(K k) {
        return cache.removeAll(k);
    }

    public void put(K k, V v) {
        cache.put(k, v);
    }

    public boolean remove(K k, K v) {
        return cache.remove(k, v);
    }

}

However, the astute programmer, soon realises the inadequacies of such a solution. The synchronised wrapper pattern here is very similar to that utilised in the JDK in that it synchronises all the calls in the interface. Also, it synchronises the entirety of all the methods in the interface meaning that all paths through any given method will need to contend for and acquire a lock. To put it another way no paths of execution through any method are non-blocking.

As a result, this implementation is likely to perform very poorly under heavy concurrent load. There will be a lot of contention and the cache will only be able to serve one operation at a time. So where do we go from here? Googling didn’t bring much success on the subject of concurrent multimaps in Java when I was looking for what was out there already so I decided to explore this area from first principles. Below I present the process of iteratively developing an efficient concurrent multimap implementation and over the course of a few implementations – making it eventually as non-blocking as possible.

It’s interesting to read why Google Guava have not and will not implement a concurrent multimap though I’m not sure I agree. I think a couple of general purpose concurrent multimaps or at the very least a copy on write multimap would be of value to the public as I’ve seen this pattern quite a lot over the years. But admittedly it wouldn’t just be one implementation. It would need to support a range of backing collections.

Concurrent multimaps

In the following example implementations I present only the most important mutative calls in the map interface as they are the most challenging and the best calls for illustration. Bear in mind also when reading through the implementations the following design considerations.

  • Mutative versus an immutable copy on write approach
  • Size of critical sections and thereby the degree of blocking
  • Strongly consistent or weakly consistent in mutual exclusion guarantees
  • When removing the last value for a key should the multimap remove the key and associated empty collection?

Weakly consistent implementations are very common in the industry but are prone to interleaving. For example, if a put() is in progress and someone calls remove() on the key altogether then, after the remove has been invoked, the put() will put the key value association back in which may not be desirable at all. Or maybe put tries to add to a value collection that is now no longer referenced because the key has been removed. These methods should ideally be mutually exclusive and the final implementation achieves this quality. Though it is important to bear in mind that for certain use cases weakly consistent guarantees are acceptable and it is for you to say what is acceptable to your use case and what isn’t.

Fully blocking multimap

The fully blocking implementation is equivalent to the synchronised wrapper approach because it synchronises the entirety of all the methods. This is without doubt the poorest performing implementation though on the plus side it has minimal allocation unlike the copy on write implementations that follow.

Advantages
  • Strongly consistent.
  • Doesn’t allocate any more than it needs to (unlike the copy on write pattern).
Disadvantages
  • Very poor performance.
  • Uses a hashmap which isn’t thread safe so offers no visibility guarantees.
  • All calls – reads/writes are blocking.
  • All paths through the blocking calls are blocking.
package name.dhruba.kb.concurrency.multimap;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FullyBlockingMutativeArrayListMultiMap<K, V> {

    private final Map<K, List<V>> cache = new HashMap<K, List<V>>();

    public synchronized List<V> get(K k) {
        return cache.get(k);
    }

    public synchronized List<V> remove(K k) {
        return cache.remove(k);
    }

    public synchronized void put(K k, V v) {
        List<V> list = cache.get(k);
        if (list == null) {
            list = new ArrayList<V>();
            cache.put(k, list);
        }
        list.add(v);
    }

    public synchronized boolean remove(K k, V v) {
        List<V> list = cache.get(k);
        if (list == null) {
            return false;
        }
        if (list.isEmpty()) {
            cache.remove(k);
            return false;
        }
        boolean removed = list.remove(v);
        if (removed && list.isEmpty()) {
            cache.remove(k);
        }
        return removed;
    }

}

Copy on write multimap using synchronisation

This is an initial implementation of a copy on write approach but without using the jdk copy on write collections. It is strongly consistent but it still synchronises too much on writes.

Advantages
  • Strongly consistent.
  • Uses concurrent hash map so we can have non-blocking read.
Disadvantages
  • The synchronisation lock blocks on the entire cache.
  • The blocking calls are entirely blocking so all paths through them will block.
  • Concurrent hash map is blocking itself although at a fine grained level using stripes.
package name.dhruba.kb.concurrency.multimap;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class CopyOnWriteArrayListMultiMap<K, V> {

	private final ConcurrentMap<K, List<V>> cache = new ConcurrentHashMap<K, List<V>>();

	public List<V> get(K k) {
		return cache.get(k);
	}

	public synchronized List<V> remove(K k) {
		return cache.remove(k);
	}

	public synchronized void put(K k, V v) {
		List<V> list = cache.get(k);
		if (list == null || list.isEmpty()) {
			list = new ArrayList<V>();
		} else {
			list = new ArrayList<V>(list);
		}
		list.add(v);
		cache.put(k, list);
	}

	public synchronized boolean remove(K k, K v) {
		List<V> list = cache.get(k);
		if (list == null) {
			return false;
		}
		if (list.isEmpty()) {
			cache.remove(k);
			return false;
		}
		boolean removed = list.remove(v);
		if (removed) {
			if (list.isEmpty()) {
				cache.remove(k);
			} else {
			    list = new ArrayList<V>(list);
				cache.put(k, list);
			}
		}
		return removed;
	}

}

Copy on write multimap but using the JDK CopyOnWriteArrayList

Here we opt to use the copy on write array list from the jdk. There is no synchronisation in the class itself (only within backing structure) but it is dangerously prone to interleaving and therefore weakly consistent. Personally I wouldn’t be happy about put() and remove() not being mutually exclusive and interleaving through each other. That to me would be unacceptable. Amazingly I’ve seen this implementation all too often at work.

Advantages
  • Uses {@link ConcurrentHashMap} for thread safety and visibility.
  • Uses {@link CopyOnWriteArrayList} for list thread safety and visibility.
  • No blocking in class itself. Instead the backing jdk classes handle blocking for us.
  • Blocking has been reduced to key level granularity instead of being at the cache level.
Disadvantages
  • Prone to interleaving. It is weakly consistent and does not guarantee mutually exclusive and atomic calls. The {@link remove(K)} call can interleave through the lines of the put method and potentially key value pairs can be added back in if a{@link remove(K)} is called part way through the {@link #put(K,V)} call. To be strongly consistent the {@link #remove(K)} and {@link #put(K,V)} need to be mutually exclusive.
package name.dhruba.kb.concurrency.multimap;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class JdkCopyOnWriteArrayListMultiMap<K, V> {

    private final ConcurrentMap<K, List<V>> cache = new ConcurrentHashMap<K, List<V>>();

    public List<V> get(K k) {
        return cache.get(k);
    }

    public List<V> remove(K k) {
        return cache.remove(k);
    }

    public void put(K k, V v) {
        List<V> list = cache.get(k);
        if (list == null) {
            list = new CopyOnWriteArrayList<V>();
            List<V> oldList = cache.putIfAbsent(k, list);
            if (oldList != null) {
                list = oldList;
            }
        }
        list.add(v);
    }

    public boolean remove(K k, K v) {
        List<V> list = cache.get(k);
        if (list == null) {
            return false;
        }
        if (list.isEmpty()) {
            cache.remove(k);
            return false;
        }
        boolean removed = list.remove(k);
        if (removed && list.isEmpty()) {
            cache.remove(k);
        }
        return removed;
    }

}

Partially blocking copy on write multimap

So from the previous implementation we return to a strongly consistent implementation but this time block only on certain paths through the put() and remove() methods at the cost of a little additional allocation. However the lock it uses is still a global one which means that operations on different keys will become sequential which is obviously not desirable.

Advantages
  • Strongly consistent.
  • Use of {@link ConcurrentHashMap} for thread safety and visibility guarantees.
  • The {@link #get(Object)} and {@link #remove(Object)} calls don’t block at all in this class.
  • The {@link #put(Object, Object)} and {@link #remove(Object, Object)} methods do block but only for certain paths. There are paths through these methods which won’t block at all. The {@link #put(Object, Object)} method only blocks if the {@link ConcurrentHashMap#putIfAbsent(Object, Object)} fails and the {@link #remove(Object, Object)} only blocks if there is something there to remove.
Disadvantages
  • We allocate a list initially in the {@link #put(Object, Object)} which may not be needed.
  • {@link ConcurrentHashMap} still blocks although at a finer level using stripes.
  • The blocking synchronisation we are using is still blocking the entire cache. What we really want is to block only the keys that hash to the value bucket that we are currently working with. A more fine grained blocking strategy is called for which we’ll see in the next implementation.
package name.dhruba.kb.concurrency.multimap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class PartiallyBlockingCopyOnWriteArrayListMultiMap<K, V> {

    private final ConcurrentMap<K, List<V>> cache = new ConcurrentHashMap<K, List<V>>();

    public List<V> get(K k) {
        return cache.get(k);
    }

    public List<V> remove(K k) {
        synchronized (cache) {
            return cache.remove(k);
        }
    }

    public void put(K k, V v) {
        List<V> list = Collections.singletonList(v);
        List<V> oldList = cache.putIfAbsent(k, list);
        if (oldList != null) {
            synchronized (cache) {
                list = cache.get(k);
                if (list == null || list.isEmpty()) {
                    list = new ArrayList<V>();
                } else {
                    list = new ArrayList<V>(list);
                }
                list.add(v);
                cache.put(k, list);
            }
        }
    }

    public boolean remove(K k, K v) {
        List<V> list = cache.get(k);
        if (list == null) {
            return false;
        }
        synchronized (cache) {
            list = cache.get(k);
            if (list == null) {
                return false;
            }
            if (list.isEmpty()) {
                cache.remove(k);
                return false;
            }
            boolean removed = list.remove(v);
            if (removed) {
                if (list.isEmpty()) {
                    cache.remove(k);
                } else {
                    list = new ArrayList<V>(list);
                    cache.put(k, list);
                }
            }
            return removed;
        }
    }

}

Striped lock copy on write multimap

The final implementation – strongly consistent, non-blocking backing structure, fine grained locking at key level and locking only on necessary paths. This example uses a striped lock provider. It’s purpose is to take a key as input and provide a lock as output to lock on. However it is consistent in that it always provides the same lock for the same key guaranteeing the mutual exclusion that is necessary.

It takes the number of locks desired as a constructor input (by default 2048) which means we can decide how many locks we want to make available in the distribution. It will then accordingly consistently hash across the distribution of locks. It also provides a better key distribution over non-concurrent hash maps. The concept behind the striped lock provider and its implementation is a very interesting topic and this will form a new post of its own in the future! Stay tuned!

Advantages
  • Strongly consistent. Implements correct mutual exclusion of calls.
  • Uses {@link NonBlockingHashMap} instead of {@link ConcurrentHashMap} so the backing cache member does not block at all. Far more efficient and scalable than {@link ConcurrentHashMap}.
  • The read calls are completely non-blocking even at the cache structure level.
  • The {@link #put(Object, Object)} and {@link #remove(Object, Object)} methods do block but only for certain paths. There are paths through these methods which won’t block at all. The {@link #put(Object, Object)} method only blocks if the {@link NonBlockingHashMap#putIfAbsent(Object, Object)} fails and the {@link #remove(Object, Object)} only blocks if there is something there to remove.
  • And to save the best for last – there is no longer any blocking at the cache level. We now apply mutual exclusion only at the key level.

This implementation has the best of all worlds really as long as the copy on write approach is acceptable to you.

Disadvantages
  • Fundamentally being a copy on write approach it does more allocation than a mutative approach.
package name.dhruba.kb.concurrency.multimap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import name.dhruba.kb.concurrency.striping.IntrinsicStripedLockProvider;

import org.cliffc.high_scale_lib.NonBlockingHashMap;

public class StripedLockArrayListMultiMap<K, V> {

    private final IntrinsicStripedLockProvider stripedLockProvider = new IntrinsicStripedLockProvider();
    private final ConcurrentMap<K, List<V>> cache = new NonBlockingHashMap<K, List<V>>();

    public List<V> get(K k) {
        return cache.get(k);
    }

    public List<V> remove(K k) {
        Object lock = stripedLockProvider.getLockForKey(k);
        synchronized (lock) {
            return cache.remove(k);
        }
    }

    public void put(K k, V v) {
        List<V> list = Collections.singletonList(v);
        List<V> oldList = cache.putIfAbsent(k, list);
        if (oldList != null) {
            Object lock = stripedLockProvider.getLockForKey(k);
            synchronized (lock) {
                list = cache.get(k);
                if (list == null || list.isEmpty()) {
                    list = new ArrayList<V>();
                } else {
                    list = new ArrayList<V>(list);
                }
                list.add(v);
                cache.put(k, list);
            }
        }
    }

    public boolean remove(K k, K v) {
        List<V> list = cache.get(k);
        if (list == null) {
            return false;
        }
        Object lock = stripedLockProvider.getLockForKey(k);
        synchronized (lock) {
            list = cache.get(k);
            if (list == null) {
                return false;
            }
            if (list.isEmpty()) {
                cache.remove(k);
                return false;
            }
            boolean removed = list.remove(v);
            if (removed) {
                if (list.isEmpty()) {
                    cache.remove(k);
                } else {
                    list = new ArrayList<V>(list);
                    cache.put(k, list);
                }
            }
            return removed;
        }
    }

}

Conclusion

When designing concurrent structures it is important not to always resort blindly to what’s out there and for custom concurrent data structures there will be no ready made solution. For those cases concurrent patterns such as this are invaluable and best practices such as reducing critical sections, reducing the amount of blocking to paths that need it, reducing the granularity of the locks being used and selection of the right backing structures are absolutely key to an efficient concurrent data structure. If you have any feedback for how to do better on the above or if I’ve made any mistakes please do let me know. Enjoy and thanks for reading.

Links

FYI – it seems Joe Kearney has done an alternative implementation that does not rely on copy on write.

Update [05/07/2011]: Code updated with bug fixes for edge cases.
Update [17/04/2013]: After a long wait I finally fixed the bug that Charlie reported below. Thanks Charlie!

Concurrency Pattern: The Producer Consumer Pattern in Java

Introduction

Recently, after a very long time, I encountered once again the topic of the producer consumer pattern in Java. In our day to day lives as developers we rarely have to implement such a pattern in code. We are usually dealing with producers and consumers in concept form at far higher level of abstractions such as inter-process level over MQ or 29 West or if it must be in-process normal method invocation might suffice. Or it may be that we only have to implement one half, either the producer or consumer, at our end and the other half is written by a downstream system that we are integrating with. I myself can’t remember the last time I had to write a literal in-process producer/consumer implementation for production.

Though one could argue that any message passing through queues or otherwise is a manifestation of this pattern but then if you continue along this route the message passing paradigm or even object orientation could be considered to represent this pattern. Okay I’ll stop here with the introduction as my philosophical side is taking over. But anyway, as I came across this recently I thought it an interesting exercise to explore and document the various ways of implementing such a pattern in Java. Employers also tend to have this as a favourite question to ask.

Problem Statement

The problem explored here is to implement the producer consumer pattern in Java to perform single message exchange without polling or spinning as these can be wasteful of resources. Though if you disagree that these can be wasteful please provide an appropriate performant implementation in the comments!

Solution

In any producer consumer scenario there are three entities – producer, consumer and broker. Though the broker is optional in certain topologies in real life such as 29West multicast – for the purposes of this post the broker is central to exploring the different ways of implementing this pattern. So let’s lay down the prerequisites.

Bootstrap

The bootstrap main method simply chooses a broker implementation, makes it available to the producer and consumer and starts the latter two as threads of their own. The broker is an interface to allow us to substitute in different implementations.

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;
import name.dhruba.kb.concurrency.pc.broker.ConditionWaitBroker;

public class ProducerConsumerBootstrap {

    public static void main(String[] args) {
        Broker<Integer> exchanger = new WaitNotifyBroker<Integer>();
        new Thread(new Producer(exchanger)).start();
        new Thread(new Consumer(exchanger)).start();
    }

}

Running the bootstrap should produce the following output.

produced: 0
consumed: 0
produced: 1
consumed: 1
produced: 2
consumed: 2
produced: 3
consumed: 3
produced: 4
consumed: 4
produced termination signal
received termination signal

Producer

The producer simply produces five integers starting at zero in sequence followed by a termination signal (-1).

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;

class Producer implements Runnable {

    final Broker<Integer> broker;

    Producer(Broker<Integer> exchanger) {
        this.broker = exchanger;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                broker.put(i);
                System.out.format("produced: %s%n", i);
                Thread.sleep(1000);
            }
            broker.put(-1);
            System.out.println("produced termination signal");
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
    }

}

Consumer

The consumer, in turn, receives the five integers and terminates consumption upon receiving the termination signal.

package name.dhruba.kb.concurrency.pc;

import name.dhruba.kb.concurrency.pc.broker.Broker;

class Consumer implements Runnable {

    final Broker<Integer> broker;

    Consumer(Broker<Integer> broker) {
        this.broker = broker;
    }

    @Override
    public void run() {
        try {
            for (Integer message = broker.take(); message != -1; message = broker.take()) {
                System.out.format("consumed: %s%n", message);
                Thread.sleep(1000);
            }
            System.out.println("received termination signal");
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
    }

}

Broker

The broker is the focus of this post. It is an interface as follows and its implementations are intended to support exchange of a single message at a time for simplicity of illustration.

package name.dhruba.kb.concurrency.pc.broker;

public interface Broker<T> {

    T take() throws InterruptedException;

    void put(T message) throws InterruptedException;

}

Broker implementations

Wait notify broker

A wait notify broker is possibly the simplest solution to the problem but also one at the lowest level of abstraction and also a legacy way. I cannot see any reason to use such primitives in the modern day and age to achieve any purpose. Joshua Bloch might say the same. Nevertheless the implementation would be as follows.

package name.dhruba.kb.concurrency.pc.broker;

public class WaitNotifyBroker<T> implements Broker<T> {

    private T message;
    private boolean empty = true;

    @Override
    public synchronized T take() throws InterruptedException {
        while (empty) {
            wait();
        }
        empty = true;
        notifyAll();
        return message;
    }

    @Override
    public synchronized void put(T message) throws InterruptedException {
        while (!empty) {
            wait();
        }
        empty = false;
        this.message = message;
        notifyAll();
    }

}
Synchronous Queue Broker

Having explored the rather boring wait-notify option we can now move onto more interesting implementations. The next thing that comes to mind is a SynchronousQueue. Why? Because the problem here is to facilitate single message exchange and for this SQ are perfect. Synchronous queues are effectively zero capacity queues and only pass messages across threads when consuming threads are ready to take handover.

    package name.dhruba.kb.concurrency.pc.broker;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class SyncQueueBroker<T> implements Broker<T> {
    
        private final BlockingQueue<T> queue = new SynchronousQueue<T>();
    
        @Override
        public T take() throws InterruptedException {
            return queue.take();
        }
    
        @Override
        public void put(T message) throws InterruptedException {
            queue.put(message);
        }
    
    }

In this example we could have used any blocking queue implementation but I used SQ for lightweight and immediate thread handovers without buffering.

Exchanger Broker

So far we’ve tackled wait-notify and queues but how else can we achieve the same effect. The next solution uses an Exchanger. The Exchanger is a little known but tremendously interesting and powerful utility to swap messages between threads in a really simple way. It’s usage is as follows.

package name.dhruba.kb.concurrency.pc.broker;

import java.util.concurrent.Exchanger;

public class ExchangerBroker<T> implements Broker<T> {

    private T message;
    private final Exchanger<T> exchanger = new Exchanger<T>();

    public void put(T message) throws InterruptedException {
        message = exchanger.exchange(message);
    }

    public T take() throws InterruptedException {
        return exchanger.exchange(message);
    }

}

I have to admit that I’ve never actually needed or found a legitimate use for an exchanger in production code and I’m curious to hear if you have.

Condition Broker

And, finally, we come to our last implementation using Condition which is again, like the Exchanger, little known and rarely seen out there (at least in my experience). I’ve only seen it in production code once and that too was for a very unusual use case. However conditions have a key advantage over their predecessor – wait/notify. To quote the javadoc on this one: “Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods”.

package name.dhruba.kb.concurrency.pc.broker;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBroker<T> implements Broker<T> {

    private T message;
    private boolean empty = true;

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition fullState = lock.newCondition();
    private final Condition emptyState = lock.newCondition();

    @Override
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (empty) {
                fullState.await();
            }
            empty = true;
            emptyState.signal();
            return message;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(T message) throws InterruptedException {
        lock.lock();
        try {
            while (!empty) {
                emptyState.await();
            }
            this.message = message;
            empty = false;
            fullState.signal();
        } finally {
            lock.unlock();
        }
    }

}

As you can see this is a much more effective semantic representation of the solution than the equivalent wait-notify.

Conclusion

The producer/consumer pattern is a useful scenario within which to explore solutions to bounded buffers, blocking queues and in-process eventing. Though it is usually of little use as higher level synchronisation primitives in Java 5 negate the need for us to implement any such thing in our processes. Employers love to ask this question in interviews. And for that bear in mind that it’s useful to know not only the higher level utilities but also the lower level wait notify behaviour even though it’s virtually obsolete at this point. If you can think of any other ways of implementing a single message exchange producer/consumer pattern please comment on here. Thanks.