Blog

Using the Task Parallel Library (TPL) for Events

The parallel tasks library was introduced with the .NET Framework 4.0 and is designed to simplify parallelism and concurrency. The API is very straightforward and usually involves passing in an Action to execute. Things get a little more interesting when you are dealing with asynchronous models such as events.

While the TPL has explicit wrappers for the asynchronous programming model (APM) that you can read about here: TPL APM Wrappers, there is no explicit way to manage events.

I usually hide the “muck” of subscribing and waiting for a completed action in events with a callback. For example, the following method generates a random number. I’m using a delay to simulate a service call and a thread task to make the call back asynchronous: you call into the method, then provide a delegate that is called once the information is available.

private static void _GenerateRandomNumber(Action<int> callback)
{
    var random = _random.Next(0, 2000) + 10;
    Console.WriteLine("Generated {0}", random);
    Task.Factory.StartNew(() =>
                                {
                                    Thread.Sleep(1000);
                                    callback(random);
                                }, TaskCreationOptions.None);
}

Now consider an algorithm that requires three separate calls to complete to provide the input values in order to compute a result. The calls are independent so they can be done in parallel. The TPL supports “parent” tasks that wait for their children to complete, and a first pass might look like this:

private static void _Incorrect()
{
            
    var start = DateTime.Now;

    int x = 0, y = 0, z = 0;

    Task.Factory.StartNew(
        () =>
            {
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => x = result),
                                        TaskCreationOptions.AttachedToParent);
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => y = result),
                                        TaskCreationOptions.AttachedToParent);
                Task.Factory.StartNew(() => _GenerateRandomNumber(result => z = result),
                                        TaskCreationOptions.AttachedToParent);
            }).ContinueWith(t =>
                                {
                                    var finish = DateTime.Now;
                                    Console.WriteLine("Bad Parallel: {0}+{1}+{2}={3} [{4}]",
                                        x, y, z,
                                        x+y+z,
                                        finish - start);
                                    _Parallel();                                            
                                });          
}

The code aggregates several tasks to the parent, the parent then waits for the children to finish and continues by computing the time span and showing the result. While the code executes extremely fast, the result is not what you want. Take a look:

Press ENTER to begin (and again to end)

Generated 593
Generated 1931
Generated 362
Bad Parallel: 0+0+0=0 [00:00:00.0190011]

You can see that three numbers were generated, but nothing was computed in the sum. The reason is that for the purposes of the TPL, the task ends when the code called ends. The TPL has no way to know that the callback was handed off to an asynchronous process (or event) and therefore considers the task complete once the generate call finishes executing. This returns and falls through and the computation is made before the callback fires and updates the values.

So how do you manage this and allow the tasks to execute in parallel but still make sure the values are retrieved?

For this purpose, the TPL provides a special class called TaskCompletionSource<T>. The task completion source is a point of synchronization that you can use to complete an asynchronous or event-based task and relay the result. The underlying task won’t complete until an exception is thrown or the result is set.

To see how this is used, let’s take the existing method and fix it using the completion sources:

private static void _Parallel()
{
    var taskCompletions = new[]
                                {
                                    new TaskCompletionSource<int>(), 
                                    new TaskCompletionSource<int>(),
                                    new TaskCompletionSource<int>()
                                };

    var tasks = new[] {taskCompletions[0].Task, taskCompletions[1].Task, taskCompletions[2].Task};

    var start = DateTime.Now;            

    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[0].TrySetResult(result)));
    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[1].TrySetResult(result)));
    Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[2].TrySetResult(result)));

    Task.WaitAll(tasks);

    var finish = DateTime.Now;
    Console.WriteLine("Parallel: {0}+{1}+{2}={3} [{4}]", 
        taskCompletions[0].Task.Result,
        taskCompletions[1].Task.Result,
        taskCompletions[2].Task.Result,
        taskCompletions[0].Task.Result + taskCompletions[1].Task.Result + taskCompletions[2].Task.Result, 
        finish - start);            
}

First, I create an array of the task completions. This makes for an easy reference to coordinate the results. Next, I create an array of the underlying tasks. This provides a collection to pass to Task.WaitAll() to synchronize all return values before computing the result. Instead of using variables, the tasks now use the TaskCompletionSource to set the results after the simulated callback. The tasks won’t complete until the result is set, so all values are returned before the final computation is made. Here are the results:

Generated 279
Generated 618
Generated 1013
Parallel: 618+279+1013=1910 [00:00:01.9981143]

You can see that all generated numbers are accounted for and properly added. You can also see that the tasks ran in parallel because it completed in under 2 seconds when each call had a 1 second delay.

The entire console application can simply be cut and pasted from the following code — there are other ways to chain the tasks and make the completions fall under a parent but this should help you get your arms wrapped around dealing with tasks that don’t complete when the methods return, but require a synchronized completion context.

class Program
{
    private static readonly Random _random = new Random();

    static void Main(string[] args)
    {
        Console.WriteLine("Press ENTER to begin (and again to end)");
        Console.ReadLine();

        _Incorrect();
            
        Console.ReadLine();                                                            
    }

    private static void _Incorrect()
    {
            
        var start = DateTime.Now;

        int x = 0, y = 0, z = 0;

        Task.Factory.StartNew(
            () =>
                {
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => x = result),
                                            TaskCreationOptions.AttachedToParent);
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => y = result),
                                            TaskCreationOptions.AttachedToParent);
                    Task.Factory.StartNew(() => _GenerateRandomNumber(result => z = result),
                                            TaskCreationOptions.AttachedToParent);
                }).ContinueWith(t =>
                                    {
                                        var finish = DateTime.Now;
                                        Console.WriteLine("Bad Parallel: {0}+{1}+{2}={3} [{4}]",
                                            x, y, z,
                                            x+y+z,
                                            finish - start);
                                        _Parallel();                                            
                                    });          
    }

    private static void _Parallel()
    {
        var taskCompletions = new[]
                                    {
                                        new TaskCompletionSource<int>(), 
                                        new TaskCompletionSource<int>(),
                                        new TaskCompletionSource<int>()
                                    };

        var tasks = new[] {taskCompletions[0].Task, taskCompletions[1].Task, taskCompletions[2].Task};

        var start = DateTime.Now;            

        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[0].TrySetResult(result)));
        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[1].TrySetResult(result)));
        Task.Factory.StartNew(() => _GenerateRandomNumber(result => taskCompletions[2].TrySetResult(result)));

        Task.WaitAll(tasks);

        var finish = DateTime.Now;
        Console.WriteLine("Parallel: {0}+{1}+{2}={3} [{4}]", 
            taskCompletions[0].Task.Result,
            taskCompletions[1].Task.Result,
            taskCompletions[2].Task.Result,
            taskCompletions[0].Task.Result + taskCompletions[1].Task.Result + taskCompletions[2].Task.Result, 
            finish - start);            
    }

    private static void _GenerateRandomNumber(Action<int> callback)
    {
        var random = _random.Next(0, 2000) + 10;
        Console.WriteLine("Generated {0}", random);
        Task.Factory.StartNew(() =>
                                    {
                                        Thread.Sleep(1000);
                                        callback(random);
                                    }, TaskCreationOptions.None);
    }
}

Jeremy Likness

View Comments

  • I dont understand, I use CotinueWhenAll, why do you not use that ?
    works well enough, I get the results...
    p.s.
    tried to add the code, but got an MVC exception, you need to use Uri.EscapeDataString or something else, and then we can add source codes to.

Recent Posts

8-Step AWS to Microsoft Azure Migration Strategy

Microsoft Azure and Amazon Web Services (AWS) are two of the most popular cloud platforms.…

6 days ago

How to Navigate Azure Governance

 Cloud management is difficult to do manually, especially if you work with multiple cloud…

2 weeks ago

Why Azure’s Scalability is Your Key to Business Growth & Efficiency

Azure’s scalable infrastructure is often cited as one of the primary reasons why it's the…

4 weeks ago

Unlocking the Power of AI in your Software Development Life Cycle (SDLC)

https://www.youtube.com/watch?v=wDzCN0d8SeA Watch our "Unlocking the Power of AI in your Software Development Life Cycle (SDLC)"…

1 month ago

The Role of FinOps in Accelerating Business Innovation

FinOps is a strategic approach to managing cloud costs. It combines financial management best practices…

2 months ago

Azure Kubernetes Security Best Practices

Using Kubernetes with Azure combines the power of Kubernetes container orchestration and the cloud capabilities…

2 months ago