Parallel For Loop in C#: Part 2

In the previous part we have implemented a ThreadPoolExecutor that accepts tasks and executes them in parallel. In this part we will implement a parallel for loop.
The parallel for needs to iterate over a collection of items and invoke the specified closure on every item. It has to perform these invocations in parallel and the next instruction should be executed only after all the elements have been processed. When used, it should look something like this:

ForEach(distributionList, (destination) => {
   Message msg = new Message(destination.Address);
   msg.Content = Encrypt(Content, destination.PublicKey);
   msg.Send();
   Console.WriteLine("sent message to" + destination.Name);
}

So the signature of the method should be:

public static void ForEach(ICollection<T> collection, Action<T> action)

We know that we need to use the ThreadPoolExecutor, enqueue all the actions and execute them and then wait for the executor to finish. But we have a mismatch. The ForEach() method takes a parameterized action, while the thread executor takes an action without any arguments. What we need to do is construct a set of parameterless actions by adding each element of the collection as the parameter of the parameterized action.

action(x) -> action1, action2, action3… actionN

We can do it with one simple line of code:

Action parameterlessAction = (() => parameterizedAction.Invoke(element));

This was first invented in lambda calculus, and it was called a closure, because the function gets closed over the free parameter. Hence the (slightly misnamed) term closure that the rest of us programmers use. Most of the programmers would be more familiar with the term currying to define this operation.

Our code would take the following form:

public static void ForEach(ICollection<T> collection, Action<T> action) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor();

            foreach (T element in collection) {
                Action parameterlessAction = (() => parameterizedAction.Invoke(element));
                executor.AddTask(parameterlessAction);
            }

            executor.Finish();
   }

   executor.Finish();
}

However, this code has a well hidden and nasty bug. The bug is caused by a certain property of closures, which makes them dangerous in the wrong hands. This is also one of the major reasons why many Java developers don’t want closures. Don’t you see it? Hmmm… Are you ready for using closures? :-)
Closures require that their free variables have an extent at least as long as the lifetime of the closure itself.
In our scenario we are referring to element inside the parameterlessAction, but the value of element changes with each iteration. So by the time we invoke the closure unexpected results can happen. Fortunately enough, in our case, Visual Studio was able to detect this situation through static analysis and shyly display a warning in the form of a blue squiggly underline. The solution is to assign element to a temporary variable, and the compiler will be smart enough to allocate that variable on the heap instead of the stack, so that the variable has a longer lifetime than the method invocation.
So here’s the final version of the code:

public static void ForEach(ICollection<T> collection, Action<T> action) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor();

            foreach (T element in collection) {
                T parameter = element;
                Action parameterlessAction = (() => parameterizedAction.Invoke(parameter));
                executor.AddTask(parameterlessAction);
            }

            executor.Finish();
   }

   executor.Finish();
}

Parallel For Loop in C#: Part 1

This is the first of the two posts that shows you how to build a parallel for loop in C#.

In this part we will start building a rudimentary ThreadPoolExecutor that will be the core of the parallel execution. It will have has 2 methods:

  • AddTask(Action a) – it queues up a task to be executed one of the threads in the pool.
  • Finish() – signals the executor to stop accepting any other tasks and wait till all tasks are completed.

The implementation is based on the BlockingQueue implemented in the previous post. It uses 10 worker threads that constanty get tasks from the queue and execute them. Since it is a blocking queue, the workers will block untill someone puts a task in the queue. The queue is thread safe, so every task will be processed only by one worker.

We will start by implementing the logic that queues up the tasks and the worker threads that execute the tasks, later we will thing about termination.

    public class ThreadPoolExecutor {
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            tasks.Enqueue(task);
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

Now we need to add the termination condition. Let’s start by just not accepting new threads, and later we will think about waiting for the tasks to be processed. We will use a boolean that indicates whether the ThreadPoolExecutor is open or closed. If it is open, it can accept new tasks. Once the Finish() method gets called, the ThreadPoolExecutor becomes closed and doesn’t accept any new task.

public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        ...

        public void AddTask(Action task) {
             if (open) {
                 tasks.Enqueue(task);
             } else {
                 throw new Exception("Queue is closed");
             }
        }

        public void Finish() {
             open = false;
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

The problem with this implementation is that the Finish() method doesn’t wait for all tasks to be processed, but returns immediately. The tasks to be processed include the ones in the queue along with the ones being currently processed by the workers. We will introduce a counter to keep track of this number. The Finish() method will block until that counter reaches 0. For blocking we will use System.Threading.Monitor and we will synchronize on “this”. Moreover, once all the tasks have been processed, we’ll interrupt all the worker threads that are blocked on the queue.

using System;
using System.Collections.Generic;
using System.Threading;

namespace Concurrent {
    public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;
        private int pendingTasks; // the number of tasks in the queue

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            lock (this) {
                if (open) {
                    tasks.Enqueue(task);
                    pendingTasks++;
                } else {
                    throw new Exception("Queue is closed");
                }
            }
        }

        public void Finish() {
            lock(this) {
                open = false;

                // wait till all tasks are finished
                while (pendingTasks > 0) {
                    Monitor.Wait(this);
                }

                // kill the workers waiting for more tasks
                foreach (Thread worker in workers) {
                    worker.Interrupt();
                }
            }
        }

        private void run() {
            try {
                while (true) {
                    lock (this) {
                        if (!open && pendingTasks == 0)
                            break; // if we're not open anymore and there are no tasks in queue, break the loop
                    }

                    // don't block other threads while processing the task
                    tasks.Get().Invoke();

                    lock (this) {
                        pendingTasks--;
                        Monitor.PulseAll(this); // notify waiting threads that the tasks are all processed
                    }
                }
            } catch(ThreadInterruptedException) {}
        }
    }
}

In order to parallelize your program, divide it in concurrently executable tasks and wrap every task in an Action object. For convenience you can also use closures. Add all the tasks to the ThreadPoolExecutor and wait for them to finish by calling the Finish() method.

In the next post we will be adding some syntactic sugar to implement a parallel for construct.

Blocking Queue in C#

Here’s a simple implementation of a blocking queue in C#. This can be used for consumer producer scenarios, buffered messaging, worker pools etc.

using System.Collections.Generic;
using System.Threading;

namespace BlockingQueue{
public class BlockingQueue<T> {
   private Queue<T> q = new Queue<T>();

   public void Enqueue(T element) {
       q.Enqueue(element);
       lock (q) {
           Monitor.Pulse(q);
       }
   }

   public T Get() {
       lock(q) {
           while (q.Count == 0) {
               Monitor.Wait(q);
           }
           return q.Dequeue();
       }
   }
}
}