Using the Task Parallel Library (TPL) for Events

The parallel tasks library was introduced with the .NET Framework 4.0 and is designed to simplify parallelism and concurrency. The API is very straightforward and usually involves passing in an Action to execute. Things get a little more interesting when you are dealing with asynchronous models such as events.

While the TPL has explicit wrappers for the asynchronous programming model (APM) that you can read about here: TPL APM Wrappers, there is no explicit way to manage events.

I usually hide the “muck” of subscribing and waiting for a completed action in events with a callback. For example, the following method generates a random number. I’m using a delay to simulate a service call and a thread task to make the call back asynchronous: you call into the method, then provide a delegate that is called once the information is available.

private static void _GenerateRandomNumber(Action<int> callback)
{
    var random = _random.Next(0, 2000) + 10;
    Console.WriteLine("Generated {0}", random);
    Task.Factory.StartNew(() =>
                                {
                                    Thread.Sleep(1000);
                                    callback(random);
                                }, TaskCreationOptions.None);
}

Now consider an algorithm that requires three separate calls to complete to provide the input values in order to compute a result. The calls are independent so they can be done in parallel. The TPL supports “parent” tasks that wait for their children to complete, and a first pass might look like this:

private static void _Incorrect()
{
            
    var start = DateTime.Now;

    int x = 0, y = 0, z = 0;

    Task.Factory.StartNew(
        () =>
            {
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => x = result),
                                        TaskCreationOptions.AttachedToParent);
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => y = result),
                                        TaskCreationOptions.AttachedToParent);
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => z = result),
                                        TaskCreationOptions.AttachedToParent);
            }).ContinueWith(t =>
                                {
                                    var finish = DateTime.Now;
                                    Console.WriteLine("Bad Parallel: {0}+{1}+{2}={3} [{4}]",
                                        x, y, z,
                                        x+y+z,
                                        finish - start);
                                    _Parallel();                                            
                                });          
}

The code aggregates several tasks to the parent, the parent then waits for the children to finish and continues by computing the time span and showing the result. While the code executes extremely fast, the result is not what you want. Take a look:

Press ENTER to begin (and again to end)

Generated 593
Generated 1931
Generated 362
Bad Parallel: 0+0+0=0 [00:00:00.0190011]

You can see that three numbers were generated, but nothing was computed in the sum. The reason is that for the purposes of the TPL, the task ends when the code called ends. The TPL has no way to know that the callback was handed off to an asynchronous process (or event) and therefore considers the task complete once the generate call finishes executing. This returns and falls through and the computation is made before the callback fires and updates the values.

So how do you manage this and allow the tasks to execute in parallel but still make sure the values are retrieved?

For this purpose, the TPL provides a special class called TaskCompletionSource<T>. The task completion source is a point of synchronization that you can use to complete an asynchronous or event-based task and relay the result. The underlying task won’t complete until an exception is thrown or the result is set.

To see how this is used, let’s take the existing method and fix it using the completion sources:

private static void _Parallel()
{
    var taskCompletions = new[]
                                {
                                    new TaskCompletionSource<int>(), 
                                    new TaskCompletionSource<int>(),
                                    new TaskCompletionSource<int>()
                                };

    var tasks = new[] {taskCompletions[0].Task, taskCompletions[1].Task, taskCompletions[2].Task};

    var start = DateTime.Now;            

    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[0].TrySetResult(result)));
    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[1].TrySetResult(result)));
    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[2].TrySetResult(result)));

    Task.WaitAll(tasks);

    var finish = DateTime.Now;
    Console.WriteLine("Parallel: {0}+{1}+{2}={3} [{4}]", 
        taskCompletions[0].Task.Result,
        taskCompletions[1].Task.Result,
        taskCompletions[2].Task.Result,
        taskCompletions[0].Task.Result + taskCompletions[1].Task.Result + taskCompletions[2].Task.Result, 
        finish - start);            
}

First, I create an array of the task completions. This makes for an easy reference to coordinate the results. Next, I create an array of the underlying tasks. This provides a collection to pass to Task.WaitAll() to synchronize all return values before computing the result. Instead of using variables, the tasks now use the TaskCompletionSource to set the results after the simulated callback. The tasks won’t complete until the result is set, so all values are returned before the final computation is made. Here are the results:

Generated 279
Generated 618
Generated 1013
Parallel: 618+279+1013=1910 [00:00:01.9981143]

You can see that all generated numbers are accounted for and properly added. You can also see that the tasks ran in parallel because it completed in under 2 seconds when each call had a 1 second delay.

The entire console application can simply be cut and pasted from the following code — there are other ways to chain the tasks and make the completions fall under a parent but this should help you get your arms wrapped around dealing with tasks that don’t complete when the methods return, but require a synchronized completion context.

class Program
{
    private static readonly Random _random = new Random();

    static void Main(string[] args)
    {
        Console.WriteLine("Press ENTER to begin (and again to end)");
        Console.ReadLine();

        _Incorrect();
            
        Console.ReadLine();                                                            
    }

    private static void _Incorrect()
    {
            
        var start = DateTime.Now;

        int x = 0, y = 0, z = 0;

        Task.Factory.StartNew(
            () =>
                {
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => x = result),
                                            TaskCreationOptions.AttachedToParent);
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => y = result),
                                            TaskCreationOptions.AttachedToParent);
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => z = result),
                                            TaskCreationOptions.AttachedToParent);
                }).ContinueWith(t =>
                                    {
                                        var finish = DateTime.Now;
                                        Console.WriteLine("Bad Parallel: {0}+{1}+{2}={3} [{4}]",
                                            x, y, z,
                                            x+y+z,
                                            finish - start);
                                        _Parallel();                                            
                                    });          
    }

    private static void _Parallel()
    {
        var taskCompletions = new[]
                                    {
                                        new TaskCompletionSource<int>(), 
                                        new TaskCompletionSource<int>(),
                                        new TaskCompletionSource<int>()
                                    };

        var tasks = new[] {taskCompletions[0].Task, taskCompletions[1].Task, taskCompletions[2].Task};

        var start = DateTime.Now;            

        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[0].TrySetResult(result)));
        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[1].TrySetResult(result)));
        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[2].TrySetResult(result)));

        Task.WaitAll(tasks);

        var finish = DateTime.Now;
        Console.WriteLine("Parallel: {0}+{1}+{2}={3} [{4}]", 
            taskCompletions[0].Task.Result,
            taskCompletions[1].Task.Result,
            taskCompletions[2].Task.Result,
            taskCompletions[0].Task.Result + taskCompletions[1].Task.Result + taskCompletions[2].Task.Result, 
            finish - start);            
    }

    private static void _GenerateRandomNumber(Action<int> callback)
    {
        var random = _random.Next(0, 2000) + 10;
        Console.WriteLine("Generated {0}", random);
        Task.Factory.StartNew(() =>
                                    {
                                        Thread.Sleep(1000);
                                        callback(random);
                                    }, TaskCreationOptions.None);
    }
}

Jeremy Likness


The Atmosera CMP is made up of the Azure ARM portal, Terraform, Chef, and proprietary powershell/logic/web-apps that allows our operations team to stand up infrastructure incredibly quickly.

There is a frontend web application that allows operations team members to input pertinent details to the build (see screenshot).

The application then creates the JSON formulation of the environment and invokes Terraform to complete the build. Once complete the infrastructure is stood up then is ready for desired state configuration. We have many cookbooks which are capable of configuring completed infrastructure to any number of final states including: Domain Controller, IIS, SQL, etc.

Please see the following web sites for the public APIs and descriptions of the open-source tools we leverage:
> www.terraform.io
> www.chef.io

Atmosera CMP

We deliver solutions that accelerate the value of Azure.

Ready to experience the full power of Microsoft Azure?

Start Today

Blog Home

Stay Connected

Upcoming Events

All Events