LINQ Multicasting in .NET 4.0

by Jack on July 12, 2010

in Software Development

Why Multicast?

SQL Server Integration Services (SSIS) includes a very useful data transformation called “Multicast”. If you’re not familiar with SSIS, it allows you to build (among other things) Data Flow Tasks in which data rows “flow” from one or more sources, through one or more transformations, to one or more destinations. For example, you could copy rows from one source table to two destination tables by building a Data Flow Task that looks something like this:

spacer

The Multicast Transform creates copies of each row it receives from the upstream component and distributes one copy to each downstream component. Multicasting is very important in SSIS because it allows us to avoid executing expensive upstream components multiple times in order to feed the same rows to multiple downstream components.

What if we have a similar situation in .NET? Say we have an IEnumerable<float> and we want to compute the minimum, maximum, and average of all values in the sequence. We could use the Min, Max, and Average standard query operators like this:

var min = sequence.Min();
var max = sequence.Max();
var average = sequence.Average();

But this code will make three separate passes through the sequence to compute the three aggregates. If enumerating the sequence were expensive, then we might not want to do it three times. Wouldn’t it be great if we had some way to get the same results with only one pass?

Jon Skeet and Marc Gravell have developed one solution that they call “Push” LINQ. In “Push” LINQ, you configure multiple aggregators for use with a single DataProducer object and then tell the DataProducer to “push” the elements of the sequence to all of the aggregators. Jon Skeet has also figured out a different solution using the Reactive Extensions for .NET. These are fine solutions, but they require custom-built query operators; you can’t just use standard query operators.

Building Multicast in .NET 4.0

It would be nice to have an IEnumerable<T> extension method that applies multiple aggregators but enumerates the sequence only once. Maybe we could pass the aggregators to this method using lambda expressions and get the results back in a Tuple. Then we could write code like this:

var tuple = sequence.Multicast(a => a.Min(), b => b.Max(), c => c.Average());
var min = tuple.Item1;
var max = tuple.Item2;
var average = tuple.Item3;

The syntax is a bit clumsy, but it’s workable given the potential performance benefits. Let’s try to create this extension method for just two aggregators:

public static Tuple<T1, T2> Multicast<T, T1, T2>(
    this IEnumerable<T> sequence,
    Func<IEnumerable<T>, T1> aggregator1,
    Func<IEnumerable<T>, T2> aggregator2)

First we have to figure out how the two aggregators can pull from the IEnumerable<T> sequence at the same time. How about running each aggregator in its own thread? We don’t want to create a lot of threads, but one for each aggregator doesn’t seem too bad. We’ll spin up a separate thread for each aggregator and then use a System.Threading.CountdownEvent object to block the main thread until the aggregator threads end.

public static Tuple<T1, T2> Multicast<T, T1, T2>(
    this IEnumerable<T> sequence,
    Func<IEnumerable<T>, T1> aggregator1,
    Func<IEnumerable<T>, T2> aggregator2)
{
    // Assign default values for the results.
    var result1 = default(T1);
    var result2 = default(T2);

    // Invoke each of the aggregators.
    InvokeAll(new Action[] { () => result1 = aggregator1.Invoke(sequence),
                             () => result2 = aggregator2.Invoke(sequence)
                           });

    // Return the aggregator results in a Tuple.
    return new Tuple<T1, T2>(result1, result2);
}

private static void InvokeAll(Action[] actions)
{
    var actionCount = actions.Length;

    // Create the CountdownEvent that will let us wait until all of the
    // action threads end.
    var countdownEvent = new CountdownEvent(actionCount);

    // Start the action threads.
    for (var i = 0; i < actionCount; i++)
    {
        // Grab a reference to the current action so we're not accessing a
        // modified closure in the thread delegate below.
        var action = actions[i];

        new Thread(() =>
            {
                // Invoke the action.
                action.Invoke();
                // Signal the CountdownEvent that we're done.
                countdownEvent.Signal();
            }).Start();
    }

    // Wait for the actions threads to end.
    countdownEvent.Wait();
}

Within its own thread, each aggregator will call GetEnumerator() on the the IEnumerable<T> sequence and will receive its own instance of the enumerator. Because each aggregator has its own enumerator instance, we’re still going to be traversing the sequence twice, but now we’re doing so in parallel. Hmmm, that’s not much of an improvement. In fact, it’s could be worse than enumerating the sequence in series. We can take another step in the right direction if both aggregators were to receive the same enumerator instance when they call GetEnumerator(). We can use a custom IEnumerable<T> that wraps the original sequence and have it always give out the same enumerator singleton. Easy enough.

Now suppose that both aggregators get the same enumerator instance when they call GetEnumerator(). Then they’re going to start iterating the sequence using MoveNext(). We could have some concurrency problems. In one case, the aggregators make alternating calls to MoveNext() and the get_Current() accessor. As a result, each aggregator processes only a subset of the elements of the sequence:

spacer

In another case, both aggregators call MoveNext() and then both call get_Current(). The two aggregators will receive the same sequence element, but another element will have been skipped entirely:

spacer

We need to ensure that the aggregators each process all of the elements in the sequence, and we can do that by having them iterate the sequence in lockstep. That is, we wait until both aggregators call MoveNext() before actually moving to the next element of the sequence. To accomplish this, we’ll need a custom IEnumerator<T>, and we’ll need to somehow make the threads wait until all of the aggregators have called MoveNext(). This is where the new System.Threading.Barrier class in .NET 4.0 comes in.

A Barrier object allows multiple threads to process an algorithm concurrently, but in phases. First we tell Barrier how many threads we have. When each thread completes processing for the current phase, it calls Barrier.SignalAndWait() which blocks the thread until all of the threads have called SignalAndWait(). In our situation, a phase will consist of each aggregate calling MoveNext(), then calling get_Current(), and then processing the item.

We will build an enumerator, LockstepEnumerator<T>, that will use a Barrier object internally. When LockstepEnumerator<T>.MoveNext() is called, it will call Barrier.SignalAndWait(). The Barrier object will wait until all enumerator threads have signaled and then invoke a post-phase delegate which calls MoveNext() on the underlying enumerator. Then LockstepEnumerator<T>.MoveNext() will return the result of the underlying MoveNext() call.

public class LockstepEnumerator<T> : IEnumerator<T>
{
    private readonly IEnumerator<T> _Enumerator;
    private readonly Barrier _Barrier;
    private bool _MoveNextResult;

    public LockstepEnumerator(IEnumerator<T> enumerator, int degree)
    {
        // Check the arguments.
        if (enumerator == null)
            throw new ArgumentNullException("enumerator");
        if (degree <= 0)
            throw new ArgumentOutOfRangeException("degree", "Degree must be a positive integer.");

        _Enumerator = enumerator;

        // Create a Barrier with the given degree as the participant count
        // and give it a post-phase action that will call MoveNext() on
        // the enumerator.
        _Barrier = new Barrier(degree, x => _MoveNextResult = _Enumerator.MoveNext());
    }

    public T Current
    {
        get { return _Enumerator.Current; }
    }

    object IEnumerator.Current
    {
        get { return Current; }
    }

    public void Dispose()
    {
        _Barrier.Dispose();
        _Enumerator.Dispose();
    }

    public bool MoveNext()
    {
        // Signal the Barrier and block the current thread.
        _Barrier.SignalAndWait();

        // The Barrier's post-phase action will have executed before
        // SignalAndWait unblocks, so return the result of that action
        // calling MoveNext() on the underlying sequence.
        return _MoveNextResult;
    }

    public void Reset()
    {
        // Doesn't make sense to allow reset with multiple threads
        // using this enumerator.
        throw new NotSupportedException();
    }
}

The entire sequence for one phase looks like this:

spacer

We still need a custom IEnumerable<T> that will hand out a singleton instance of LockstepEnumerator<T> when GetEnumerator() is called. We’ll call it LockstepEnumerable<T>, and have it wrap the original IEnumerable<T>.

public class LockstepEnumerable<T> : IEnumerable<T>
{
    // Lazy that will create the enumerator when requested.
    private readonly Lazy<IEnumerator<T>> _Enumerator;

    public LockstepEnumerable(IEnumerable<T> sequence, int degree)
    {
        // Check the arguments.
        if (sequence == null)
            throw new ArgumentNullException("sequence");
        if (degree <= 0)
            throw new ArgumentOutOfRangeException("degree", "Degree must be a positive integer");

        // Create the Lazy that will create the LockstepEnumerator wrapping
        // the sequence enumerator.
        _Enumerator = new Lazy<IEnumerator<T>>(() => new LockstepEnumerator<T>(sequence.GetEnumerator(), degree));
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _Enumerator.Value;
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Finally, we have to modify our Multicast extension method so that it wraps the sequence in a LockstepEnumerable<T>:

public static Tuple<T1, T2> Multicast<T, T1, T2>(
    this IEnumerable<T> sequence, Func<IEnumerable<T>, T1> aggregator1,
    Func<IEnumerable<T>, T2> aggregator2)
{
    // Assign default values for the results.
    var result1 = default(T1);
    var result2 = default(T2);

    var multicastSequence = new LockstepEnumerable<T>(sequence, 2);

    InvokeAll(new Action[] {
                () => result1 = aggregator1.Invoke(multicastSequence),
                () => result2 = aggregator2.Invoke(multicastSequence)
            });

    return Tuple.Create(result1, result2);
}

And, voilà! We can multicast.

Multicast Performance

Let’s see our Multicast method in action. To demonstrate its performance characteristics, I started with two sequences: an “expensive” sequence and a “normal” sequence, each containing 1,000 integers. I simulated “expensive” by making the thread sleep for 10ms each time MoveNext() is called. I applied the Min, Max, and Average aggregates to these sequences both with and without Multicast. The mean results for several trials are:

“Expensive” sequence without Multicast: 30,010 ms
“Expensive” sequence with Multicast: 10,036 ms
“Normal” sequence without Multicast: 2 ms
“Normal” sequence with Multicast: 50 ms

The results for the “expensive” sequence clearly show the advantages of our Multicast method when applying multiple aggregators. In this scenario, the time saved by making only one pass through the sequence far outweighs the time spent in the aggregators and the overhead of parallel execution. For the “normal” sequence, the opposite is true; it’s far more efficient to make three passes than to incur the overhead of multicasting.

This Multicast extension method isn’t a silver bullet. It’s really only good for situations where (1) you’re applying aggregate query operators to an expensive sequence, (2) you don’t want to persist the sequence or it would be too expensive to do so, and (3) you don’t mind spinning up threads. For those situations, though, the performance improvement can be very significant.

You can download a VS.NET project containing code for the Multicast extension method and performance tests here. The code includes overloads of the Multicast method supporting up to seven aggregators.

Tagged as: .NET, .NET 4.0, C#, LINQ, Multithreading

{ 6 comments… read them below or add one }

spacer Mark Shiffer July 13, 2010 at 12:27 pm

Very interesting article Jack. I must say, as I was reading along and you started to show your unfinished implementations, I kept saying “that’s not going to work because…”, then you would address the issue. The final implementation of the LockStepEnumerator is an interesting solution.

spacer Drew Marsh July 14, 2010 at 9:52 am

Very cool. Only thing I would change is to use Task Paralell Library to execute the aggregators rather than starting your own threads. Obviously TPL is a 4.0 framework feature, so if you wanted to maintain compatibility with 3.5 then I would still think it would be better to use ThreadPool::QueueUserWorkItem over starting your own threads.

spacer Jack July 14, 2010 at 10:42 am

@Drew, I would have liked to use the TPL here (after all, this solution targets .NET 4.0), but I had to ensure that the aggregators would all execute concurrently because they’re enumerating the sequence in lockstep. The TPL uses the thread pool and has its own heuristic to determine an “optimal” degree of parallelism and there may be other tasks using the thread pool at the same time. So there’s no guarantee that the TPL would execute all of the aggregators concurrently.

spacer Drew Marsh July 14, 2010 at 11:02 am

@Jack

Ah, a very good point that I didn’t consider, but very obvious now that you point it out. You should consider adding a little aside at the bottom of this post about that because it’s a nice litle “gotcha” and I’m sure a lot of other people read it and think the same thing I did and wonder why you had to use your own threads.

spacer Al Tenhundfeld October 18, 2010 at 2:29 pm

This is a great explanation of a fairly complicated topic and an interesting solution too. Nice post!

spacer Ram January 17, 2011 at 10:26 am

Hi Jack,
Did you consider abstracting the persistence and letting the threads finish at their own pace? e.g. You get 2 million Ints from the network in chunks of some buffer size, and want to process them through 3 different algorithms and show the corresponding histograms. You will end up having to persist even if you do it in three calls, but abstracting the Multicast and making the persistence transparent has obvious cognitive benefits. Are there any performance benefits too?

Leave a Comment

Previous post: Adventures in .NET Rounding Part 3: Rounding in Action

Next post: Better Rate Limiting in .NET

gipoco.com is neither affiliated with the authors of this page nor responsible for its contents. This is a safe-cache copy of the original web site.