Parallel For Loop in C#: Part 2

In the previous part we have implemented a ThreadPoolExecutor that accepts tasks and executes them in parallel. In this part we will implement a parallel for loop.
The parallel for needs to iterate over a collection of items and invoke the specified closure on every item. It has to perform these invocations in parallel and the next instruction should be executed only after all the elements have been processed. When used, it should look something like this:

ForEach(distributionList, (destination) => {
   Message msg = new Message(destination.Address);
   msg.Content = Encrypt(Content, destination.PublicKey);
   msg.Send();
   Console.WriteLine("sent message to" + destination.Name);
}

So the signature of the method should be:

public static void ForEach(ICollection<T> collection, Action<T> action)

We know that we need to use the ThreadPoolExecutor, enqueue all the actions and execute them and then wait for the executor to finish. But we have a mismatch. The ForEach() method takes a parameterized action, while the thread executor takes an action without any arguments. What we need to do is construct a set of parameterless actions by adding each element of the collection as the parameter of the parameterized action.

action(x) -> action1, action2, action3… actionN

We can do it with one simple line of code:

Action parameterlessAction = (() => parameterizedAction.Invoke(element));

This was first invented in lambda calculus, and it was called a closure, because the function gets closed over the free parameter. Hence the (slightly misnamed) term closure that the rest of us programmers use. Most of the programmers would be more familiar with the term currying to define this operation.

Our code would take the following form:

public static void ForEach(ICollection<T> collection, Action<T> action) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor();

            foreach (T element in collection) {
                Action parameterlessAction = (() => parameterizedAction.Invoke(element));
                executor.AddTask(parameterlessAction);
            }

            executor.Finish();
   }

   executor.Finish();
}

However, this code has a well hidden and nasty bug. The bug is caused by a certain property of closures, which makes them dangerous in the wrong hands. This is also one of the major reasons why many Java developers don’t want closures. Don’t you see it? Hmmm… Are you ready for using closures? :-)
Closures require that their free variables have an extent at least as long as the lifetime of the closure itself.
In our scenario we are referring to element inside the parameterlessAction, but the value of element changes with each iteration. So by the time we invoke the closure unexpected results can happen. Fortunately enough, in our case, Visual Studio was able to detect this situation through static analysis and shyly display a warning in the form of a blue squiggly underline. The solution is to assign element to a temporary variable, and the compiler will be smart enough to allocate that variable on the heap instead of the stack, so that the variable has a longer lifetime than the method invocation.
So here’s the final version of the code:

public static void ForEach(ICollection<T> collection, Action<T> action) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor();

            foreach (T element in collection) {
                T parameter = element;
                Action parameterlessAction = (() => parameterizedAction.Invoke(parameter));
                executor.AddTask(parameterlessAction);
            }

            executor.Finish();
   }

   executor.Finish();
}

Parallel For Loop in C#: Part 1

This is the first of the two posts that shows you how to build a parallel for loop in C#.

In this part we will start building a rudimentary ThreadPoolExecutor that will be the core of the parallel execution. It will have has 2 methods:

  • AddTask(Action a) – it queues up a task to be executed one of the threads in the pool.
  • Finish() – signals the executor to stop accepting any other tasks and wait till all tasks are completed.

The implementation is based on the BlockingQueue implemented in the previous post. It uses 10 worker threads that constanty get tasks from the queue and execute them. Since it is a blocking queue, the workers will block untill someone puts a task in the queue. The queue is thread safe, so every task will be processed only by one worker.

We will start by implementing the logic that queues up the tasks and the worker threads that execute the tasks, later we will thing about termination.

    public class ThreadPoolExecutor {
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            tasks.Enqueue(task);
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

Now we need to add the termination condition. Let’s start by just not accepting new threads, and later we will think about waiting for the tasks to be processed. We will use a boolean that indicates whether the ThreadPoolExecutor is open or closed. If it is open, it can accept new tasks. Once the Finish() method gets called, the ThreadPoolExecutor becomes closed and doesn’t accept any new task.

public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;

        ...

        public void AddTask(Action task) {
             if (open) {
                 tasks.Enqueue(task);
             } else {
                 throw new Exception("Queue is closed");
             }
        }

        public void Finish() {
             open = false;
        }

        private void run() {
             while (true) {
                 tasks.Get().Invoke();
             }
        }
    }

The problem with this implementation is that the Finish() method doesn’t wait for all tasks to be processed, but returns immediately. The tasks to be processed include the ones in the queue along with the ones being currently processed by the workers. We will introduce a counter to keep track of this number. The Finish() method will block until that counter reaches 0. For blocking we will use System.Threading.Monitor and we will synchronize on “this”. Moreover, once all the tasks have been processed, we’ll interrupt all the worker threads that are blocked on the queue.

using System;
using System.Collections.Generic;
using System.Threading;

namespace Concurrent {
    public class ThreadPoolExecutor {
        private bool open = true; // open to accept new tasks
        private readonly BlockingQueue tasks = new BlockingQueue();
        private readonly Thread[] workers;
        private int pendingTasks; // the number of tasks in the queue

        public ThreadPoolExecutor() {
            workers = new Thread[10];
            for (int i = 0; i < workers.Length; i++) {
                workers[i] = new Thread(run);
                workers[i].Start();
            }
        }

        public void AddTask(Action task) {
            lock (this) {
                if (open) {
                    tasks.Enqueue(task);
                    pendingTasks++;
                } else {
                    throw new Exception("Queue is closed");
                }
            }
        }

        public void Finish() {
            lock(this) {
                open = false;

                // wait till all tasks are finished
                while (pendingTasks > 0) {
                    Monitor.Wait(this);
                }

                // kill the workers waiting for more tasks
                foreach (Thread worker in workers) {
                    worker.Interrupt();
                }
            }
        }

        private void run() {
            try {
                while (true) {
                    lock (this) {
                        if (!open && pendingTasks == 0)
                            break; // if we're not open anymore and there are no tasks in queue, break the loop
                    }

                    // don't block other threads while processing the task
                    tasks.Get().Invoke();

                    lock (this) {
                        pendingTasks--;
                        Monitor.PulseAll(this); // notify waiting threads that the tasks are all processed
                    }
                }
            } catch(ThreadInterruptedException) {}
        }
    }
}

In order to parallelize your program, divide it in concurrently executable tasks and wrap every task in an Action object. For convenience you can also use closures. Add all the tasks to the ThreadPoolExecutor and wait for them to finish by calling the Finish() method.

In the next post we will be adding some syntactic sugar to implement a parallel for construct.

Blocking Queue in C#

Here’s a simple implementation of a blocking queue in C#. This can be used for consumer producer scenarios, buffered messaging, worker pools etc.

using System.Collections.Generic;
using System.Threading;

namespace BlockingQueue{
public class BlockingQueue<T> {
   private Queue<T> q = new Queue<T>();

   public void Enqueue(T element) {
       q.Enqueue(element);
       lock (q) {
           Monitor.Pulse(q);
       }
   }

   public T Get() {
       lock(q) {
           while (q.Count == 0) {
               Monitor.Wait(q);
           }
           return q.Dequeue();
       }
   }
}
}

Time Tracking Tools

Have you ever asked yourself how much time during the day in the office do you spend looking at funny youtube videos, reading emails, taking coffee or doing some boring administrative task that keeps you from getting your work finished? I’ve actually been asking myself that for what it feels my entire life. I always tought that a well organized and productive 3 hours could be more valuable than a whole day of distractions. I always wanted to measure that productivity in detail and see what things I’m spending my time on and how much. Now I finally can.

A few months ago I found out on Lifehacker about this software called RescueTime that tracks how you spend your time on a computer. It’s free and you run it in the background. It categrorizes your activities in Communication, Development Tools, Reference/Search etc. If you are using a browser it is smart enough to distinguish which sites are you accessing, so reading The Server Side and Failblog.org are considered two different activities.

 

rescuetime-screenshot

One downside of RescueTime is that it posts your usage statistics to their servers, so if you don’t want anyone to see how much time you spent… watching porn in the office, you might choose to disable RescueTime for a couple of hours. On the other side, your usage statistics are accessible only by you, so your boss can’t detract from your salary every 5 mins you spent checking your private mail.

Another downside is that it doesn’t show you when exactly did you start working with a particular program and when were you AFK. So you can’t distinguish between a meeting and a lunch break.

Both of these problems are solved by ManicTime. Manic time works only locally and gives you information on what did you do at an exact moment of the day: 
manictime-screenshot

 

 

The only downside of ManicTime is that it is too fine grained so you don’t get the quick overview that you can get in RescueTime. Currently I’m using both of them until I decide which one is better or one of them implements the functionalities of the other.

Active and Passive Replication in Distributed Systems

In the distributed systems research area replication is mainly used to provide fault tolerance. The entity being replicated is a process. Two replication strategies have been used in distributed systems: Active and Passive replication.

active-passive-replication

In active replication each client request is processed by all the servers. Active Replication was first introduced by Leslie Lamport under the name state machine replication. This requires that the process hosted by the servers is deterministic. Deterministic means that, given the same initial state and a request sequence, all processes will produce the same response sequence and end up in the same final state. In order to make all the servers receive the same sequence of operations, an atomic broadcast protocol must be used. An atomic broadcast protocol guarantees that either all the servers receive a message or none, plus that they all receive messages in the same order. The big disadvantage for active replication is that in practice most of the real world servers are non‐deterministic. Still active replication is the preferable choice when dealing with real time systems that require quick response even under the presence of faults or with systems that must handle byzantine faults.  

In passive replication there is only one server (called primary) that processes client requests. After processing a request, the primary server updates the state on the other (backup) servers and sends back the response to the client. If the primary server fails, one of the backup servers takes its place. Passive replication may be used even for non‐deterministic processes. The disadvantage of passive replication compared to active is that in case of failure the response is delayed.