In the previous part we have implemented a ThreadPoolExecutor that accepts tasks and executes them in parallel. In this part we will implement a parallel for loop.
The parallel for needs to iterate over a collection of items and invoke the specified closure on every item. It has to perform these invocations in parallel and the next instruction should be executed only after all the elements have been processed. When used, it should look something like this:
ForEach(distributionList, (destination) => {
Message msg = new Message(destination.Address);
msg.Content = Encrypt(Content, destination.PublicKey);
msg.Send();
Console.WriteLine("sent message to" + destination.Name);
}
So the signature of the method should be:
public static void ForEach(ICollection<T> collection, Action<T> action)
We know that we need to use the ThreadPoolExecutor, enqueue all the actions and execute them and then wait for the executor to finish. But we have a mismatch. The ForEach() method takes a parameterized action, while the thread executor takes an action without any arguments. What we need to do is construct a set of parameterless actions by adding each element of the collection as the parameter of the parameterized action.
action(x) -> action1, action2, action3… actionN
We can do it with one simple line of code:
Action parameterlessAction = (() => parameterizedAction.Invoke(element));
This was first invented in lambda calculus, and it was called a closure, because the function gets closed over the free parameter. Hence the (slightly misnamed) term closure that the rest of us programmers use. Most of the programmers would be more familiar with the term currying to define this operation.
Our code would take the following form:
public static void ForEach(ICollection<T> collection, Action<T> action) {
ThreadPoolExecutor executor = new ThreadPoolExecutor();
foreach (T element in collection) {
Action parameterlessAction = (() => parameterizedAction.Invoke(element));
executor.AddTask(parameterlessAction);
}
executor.Finish();
}
executor.Finish();
}
However, this code has a well hidden and nasty bug. The bug is caused by a certain property of closures, which makes them dangerous in the wrong hands. This is also one of the major reasons why many Java developers don’t want closures. Don’t you see it? Hmmm… Are you ready for using closures? ![]()
Closures require that their free variables have an extent at least as long as the lifetime of the closure itself.
In our scenario we are referring to element inside the parameterlessAction, but the value of element changes with each iteration. So by the time we invoke the closure unexpected results can happen. Fortunately enough, in our case, Visual Studio was able to detect this situation through static analysis and shyly display a warning in the form of a blue squiggly underline. The solution is to assign element to a temporary variable, and the compiler will be smart enough to allocate that variable on the heap instead of the stack, so that the variable has a longer lifetime than the method invocation.
So here’s the final version of the code:
public static void ForEach(ICollection<T> collection, Action<T> action) {
ThreadPoolExecutor executor = new ThreadPoolExecutor();
foreach (T element in collection) {
T parameter = element;
Action parameterlessAction = (() => parameterizedAction.Invoke(parameter));
executor.AddTask(parameterlessAction);
}
executor.Finish();
}
executor.Finish();
}
RSS - Posts