Parallel For Loop in C#: Part 1

This is the first of the two posts that shows you how to build a parallel for loop in C#.

In this part we will start building a rudimentary ThreadPoolExecutor that will be the core of the parallel execution. It will have has 2 methods:

  • AddTask(Action a) – it queues up a task to be executed one of the threads in the pool.
  • Finish() – signals the executor to stop accepting any other tasks and wait till all tasks are completed.

The implementation is based on the BlockingQueue implemented in the previous post. It uses 10 worker threads that constanty get tasks from the queue and execute them. Since it is a blocking queue, the workers will block untill someone puts a task in the queue. The queue is thread safe, so every task will be processed only by one worker.

We will start by implementing the logic that queues up the tasks and the worker threads that execute the tasks, later we will thing about termination.

    public class ThreadPoolExecutor {
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            tasks.Enqueue(task);
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

Now we need to add the termination condition. Let’s start by just not accepting new threads, and later we will think about waiting for the tasks to be processed. We will use a boolean that indicates whether the ThreadPoolExecutor is open or closed. If it is open, it can accept new tasks. Once the Finish() method gets called, the ThreadPoolExecutor becomes closed and doesn’t accept any new task.

    
public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        ...

        public void AddTask(Action task) {
             if (open) {
                 tasks.Enqueue(task);
             } else {
                 throw new Exception("Queue is closed");
             }
        }

        public void Finish() {
             open = false;
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

The problem with this implementation is that the Finish() method doesn’t wait for all tasks to be processed, but returns immediately. The tasks to be processed include the ones in the queue along with the ones being currently processed by the workers. We will introduce a counter to keep track of this number. The Finish() method will block until that counter reaches 0. For blocking we will use System.Threading.Monitor and we will synchronize on “this”. Moreover, once all the tasks have been processed, we’ll interrupt all the worker threads that are blocked on the queue.

using System;
using System.Collections.Generic;
using System.Threading;

namespace Concurrent {
    public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;
        private int pendingTasks; // the number of tasks in the queue

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            lock (this) {
                if (open) {
                    tasks.Enqueue(task);
                    pendingTasks++;
                } else {
                    throw new Exception("Queue is closed");
                }
            }
        }

        public void Finish() {
            lock(this) {
                open = false;

                // wait till all tasks are finished
                while (pendingTasks > 0) {
                    Monitor.Wait(this);
                }

                // kill the workers waiting for more tasks
                foreach (Thread worker in workers) {
                    worker.Interrupt();
                }
            }
        }

        private void run() {
            try {
                while (true) {
                    lock (this) {
                        if (!open && pendingTasks == 0)
                            break; // if we're not open anymore and there are no tasks in queue, break the loop
                    }

                    // don't block other threads while processing the task
                    tasks.Get().Invoke();

                    lock (this) {
                        pendingTasks--;
                        Monitor.PulseAll(this); // notify waiting threads that the tasks are all processed
                    }
                }
            } catch(ThreadInterruptedException) {}
        }
    }
}

In order to parallelize your program, divide it in concurrently executable tasks and wrap every task in an Action object. For convenience you can also use closures. Add all the tasks to the ThreadPoolExecutor and wait for them to finish by calling the Finish() method.

In the next post we will be adding some syntactic sugar to implement a parallel for construct.

Advertisements

One comment

  1. Pingback: Parallel For Loop in C#: Part 2 « Collaborative Creativity

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s