Async producer/consumer the easy way

This article was originally published at tech.blinemedical.com

In .net 4, a new class called BlockingCollection was introduced, which let you have a threadsafe producer/consumer queue. Anyone consuming a BlockingCollection blocks automatically until new items are added. This lets you easily add items to the collection in one thread and use another synchronized thread to consume items. This class is great since before this existed, you had to do all this work with mutexes and it was a lot of extra work (and more error prone). In general, a good time to use a decoupled producer consumer pattern is when you have a slow consuming function and a producer thread that is time sensitive.

Even though BlockingCollection effectively synchronizes your producer/consumer, you still have to create boilerplate to manage the producer thread and the consumer thread. Also if you wanted to add extra exception handling or a cancellation token, you’d have to add all that yourself too. I wrapped this all up in a BlockingCollectionWrapper class that handles all this for you.

An example

Here is an example where the consumer takes one second each time it consumes an item.

private readonly ManualResetEvent _testMutex = new ManualResetEvent(false);

[Test]
public void TestCollection()
{
    // create the wrapper
    var asyncCollection = new BlockingCollectionWrapper<string>();

    asyncCollection.FinishedEvent += FinishedEventHandler;

    // make sure we dispose of it. this will stop the internal thread
    using (asyncCollection)
    {
        // register a consuming action
        asyncCollection.QueueConsumingAction = (producedItem) =>
        {
            Thread.Sleep(TimeSpan.FromSeconds(1));
            Console.WriteLine(DateTime.Now + ": Consuming item: " + producedItem);
        };

        // start consuming
        asyncCollection.Start();

        // start producing
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(DateTime.Now + ": Produced item " + i);
            asyncCollection.AddItem(i.ToString());
        }
    }

    // wait for the finished handler to pulse this
    _testMutex.WaitOne();

    Assert.True(asyncCollection.Finished);
}

private void FinishedEventHandler(object sender, BlockingCollectionEventArgs e)
{
    _testMutex.Set();
}

This prints out

9/17/2012 6:22:43 PM: Produced item 0
9/17/2012 6:22:43 PM: Produced item 1
9/17/2012 6:22:43 PM: Produced item 2
9/17/2012 6:22:43 PM: Produced item 3
9/17/2012 6:22:43 PM: Produced item 4
9/17/2012 6:22:43 PM: Produced item 5
9/17/2012 6:22:43 PM: Produced item 6
9/17/2012 6:22:43 PM: Produced item 7
9/17/2012 6:22:43 PM: Produced item 8
9/17/2012 6:22:43 PM: Produced item 9
9/17/2012 6:22:44 PM: Consuming item: 0
9/17/2012 6:22:45 PM: Consuming item: 1
9/17/2012 6:22:46 PM: Consuming item: 2
9/17/2012 6:22:47 PM: Consuming item: 3
9/17/2012 6:22:48 PM: Consuming item: 4
9/17/2012 6:22:49 PM: Consuming item: 5
9/17/2012 6:22:50 PM: Consuming item: 6
9/17/2012 6:22:51 PM: Consuming item: 7
9/17/2012 6:22:52 PM: Consuming item: 8
9/17/2012 6:22:53 PM: Consuming item: 9

First, I created the blocking collection wrapper and made sure to put it in a using block since it’s disposable (the thread waiting on the blocking collection will need to be cleaned up). Then I registered a function to be executed each time an item is consumed. Calling Start() begins consuming. Once I’m done – even after the using block disposes of the wrapper – the separate consumer thread could still be running (processing whatever is left), but it is no longer blocking on additions and will complete consuming any pending items.

The wrapper

When you call .Start() we start our independent consumer thread.

/// <summary>
/// Start the consumer
/// </summary>
public void Start()
{
    _cancellationTokenSource = new CancellationTokenSource();
    _thread = new Thread(QueueConsumer) {Name = "BlockingConsumer"};
    _thread.Start();
}

This is the queue consumer that runs in the separate thread that executes the registered consumer action. The consuming action is locked to make changing the consuming action threadsafe.

/// <summary>
/// The actual consumer queue that runs in a seperate thread
/// </summary>
private void QueueConsumer()
{
    try
    {
        // Block on _queue.GetConsumerEnumerable 
        // When an item is added to the _queue it will unblock and let us consume
        foreach (var item in _queue.GetConsumingEnumerable(_cancellationTokenSource.Token))
        {
            // get a synchronized snapshot of the action
            Action<T> consumerAction = QueueConsumingAction;
                
            // execute our registered consuming action
            if (consumerAction != null)
            {
                consumerAction(item);
            }
        }

        // dispose of the token source
        if (_cancellationTokenSource != null)
        {
            _cancellationTokenSource.Dispose();
        }

        //Log.Debug(this, "Done with queue consumer");

        Finished = true;

        if (FinishedEvent != null)
        {
            FinishedEvent(this, new BlockingCollectionEventArgs());
        }
    }
    catch(OperationCanceledException)
    {
        //Log.Debug(this, "Blocking collection<{0}> cancelled", typeof(T));
    }
    catch (Exception ex)
    {
        //Log.Error(this, ex, "Error consuming from queue of type {0}", typeof(T));
    }
}

And when the wrapper is disposed, we set CompleteAdding on the blocking collection which tells the collection to stop waiting for new additions and finish out whatever is left in the queue.

protected void Dispose(bool disposing)
{
    if(disposing)
    {
        if (_queue !=null && !_queue.IsAddingCompleted)
        {
            // mark the queue as complete
            // the BlockingConsumer thread will now
            // just process the remaining items
            _queue.CompleteAdding();
        }
    }
}

public void Dispose()
{
    Dispose(true);
}

The remaining properties and functions on the wrapper let you

  • Force abort the consumer thread
  • Register a Finished event handler; disposing of the wrapper doesn’t mean that no more work is being done. It means that you are no longer adding items and the queue is effectively “closed”. Depending on your consumer function though, this could take some time to complete. This is why it’s good to hook into the finished event so you can be sure that all your processing is complete.
  • Manually mark the queue as AddedComplete (so the thread stops blocking)
  • Manually cancel the queue
  • Check if the queue is ended by looking at the Finished property

So to reiterate, the basic idea here is

  • Create a separate thread that has appropriate exception handling to be blocked while consuming the queued items
  • Handle cancellation gracefully
  • Be able to properly end our spawned thread so we don’t have anything leftover

It should be noted that even though this wrapper is built for a single consumer/single producer design, since we are leveraging GetConsumingEnumerable we could modify the wrapper to allow for multiple threads acting as consumers on the same enumerable. This could give us a single producer/multiple synchronized consumer pattern where only one consumer thread gets the particular item but multiple consumer threads exist and can do work.

Full source and tests provided at our github.

4 comments

  1. Pingback: Single producer many consumer | Onoffswitch.net
  2. Steve Hayles

    Hi,

    Nice idea and it works fairly well but I think the code in QueueConsumer needs some attention

    Calling cancel() on the _cancellationTokenSource will throw an OperationCancelledException which you catch but it will effectively bypass the code setting Finished to true and firing any attached Finished eventhandler.

    This should probably go in a finally section after the exception handlers.

Post a comment

You may use the following HTML:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>