Chapter 23 - Parallel Programming


Calculating prime numbers

// Calculate prime numbers using a simple (unoptimized) algorithm.
// This calculates prime numbers between 3 and a million, using all available cores:

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery = 
  from n in numbers.AsParallel()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

int[] primes = parallelQuery.ToArray();

Calculating prime numbers - ordered

// Calculate prime numbers with ordered output.

IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);

var parallelQuery = 
  from n in numbers.AsParallel().AsOrdered()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

int[] primes = parallelQuery.ToArray();

// In this example, we could alternatively call OrderBy at the end of the query.

Parallel spell checker

void Main()
  string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
  if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
    new WebClient().DownloadFile (
      "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
  var wordLookup = new HashSet<string> (
    File.ReadAllLines (wordLookupFile),
  var random = new Random();
  string[] wordList = wordLookup.ToArray();
  string[] wordsToTest = Enumerable.Range (0, 1000000)
    .Select (i => wordList [random.Next (0, wordList.Length)])
  wordsToTest [12345] = "woozsh";     // Introduce a couple
  wordsToTest [23456] = "wubsie";     // of spelling mistakes.
  var query = wordsToTest
    .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
    .Where   (iword => !wordLookup.Contains (iword.Word))
    .OrderBy (iword => iword.Index);

struct IndexedWord { public string Word; public int Index; }

Parallel spell checker - enhanced

void Main()
  string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
  if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
    new WebClient().DownloadFile (
      "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
  var wordLookup = new HashSet<string> (
    File.ReadAllLines (wordLookupFile),
  string[] wordList = wordLookup.ToArray();

  // Here, we're using ThreadLocal to generate a thread-safe random number generator,
  // so we can parallelize the building of the wordsToTest array.
  var localRandom = new ThreadLocal<Random>
    ( () => new Random (Guid.NewGuid().GetHashCode()) );

  string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
    .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
  wordsToTest [12345] = "woozsh";     // Introduce a couple
  wordsToTest [23456] = "wubsie";     // of spelling mistakes.
  var query = wordsToTest
    .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
    .Where   (iword => !wordLookup.Contains (iword.Word))
    .OrderBy (iword => iword.Index);

struct IndexedWord { public string Word; public int Index; }

Functional purity

  int i = 0;
  (from n in Enumerable.Range(0,999).AsParallel() select n * i++).Dump ("unsafe");

  Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i).Dump ("safe");

Changing degree of parallelism

"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism (2)
  .Where (c => !char.IsWhiteSpace (c))
  .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
  .Select (c => char.ToUpper (c))


IEnumerable<int> million = Enumerable.Range (3, 1000000);

var cancelSource = new CancellationTokenSource();

var primeNumberQuery = 
  from n in million.AsParallel().WithCancellation (cancelSource.Token)
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;

new Thread (() => {
          Thread.Sleep (100);      // Cancel query after
          cancelSource.Cancel();   // 100 milliseconds.
  // Start query running:
  int[] primes = primeNumberQuery.ToArray();
  // We'll never get here because the other thread will cancel us.
catch (OperationCanceledException)
  Console.WriteLine ("Query canceled");

Output-side optimization

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

Input-side optimization - forcing chunk partitioning

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };

var parallelQuery =
  Partitioner.Create (numbers, true).AsParallel()
  .Where (n => n % 2 == 0);

Optimizing aggregations - simple use of Aggregate

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6

Optimizing aggregations - seed factory function (contrived)

new[] { 1, 2, 3 }.AsParallel().Aggregate (
  () => 0,                                     // seedFactory
  (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
  (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
  finalResult => finalResult)                  // resultSelector

Optimizing aggregations - letter frequencies imperative

string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
  int index = char.ToUpper (c) - 'A';
  if (index >= 0 && index <= 26) letterFrequencies [index]++;

Optimizing aggregations - letter frequencies functional

string text = "Let’s suppose this is a really long string";

int[] result =
  text.Aggregate (
    new int[26],                // Create the "accumulator"
    (letterFrequencies, c) =>   // Aggregate a letter into the accumulator
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) letterFrequencies [index]++;
      return letterFrequencies;


Optimizing aggregations - letter frequencies parallel

string text = "Let’s suppose this is a really long string";

int[] result =
  text.AsParallel().Aggregate (
    () => new int[26],             // Create a new local accumulator
    (localFrequencies, c) =>       // Aggregate into the local accumulator
      int index = char.ToUpper (c) - 'A';
      if (index >= 0 && index <= 26) localFrequencies [index]++;
      return localFrequencies;
                    // Aggregate local->main accumulator
    (mainFreq, localFreq) =>
      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
    finalResult => finalResult     // Perform any final transformation
  );                                 // on the end result.

The Parallel Class


Parallel.Invoke (
  () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
  () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));


var keyPairs = new string[6];

Parallel.For (0, keyPairs.Length,
        i => keyPairs[i] = RSA.Create().ToXmlString (true));


PLINQ version

ParallelEnumerable.Range (0, 6)
  .Select (i => RSA.Create().ToXmlString (true))

Parallel.Foreach - indexed

Parallel.ForEach ("Hello, world", (c, state, i) =>
  Console.WriteLine (c.ToString() + i);

Parallel Spellchecker with TPL

string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");

if (!File.Exists (wordLookupFile))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", wordLookupFile);

var wordLookup = new HashSet<string> (
  File.ReadAllLines (wordLookupFile),

var random = new Random();
string[] wordList = wordLookup.ToArray();

string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])

wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

var misspellings = new ConcurrentBag<Tuple<int,string>>();

Parallel.ForEach (wordsToTest, (word, state, i) =>
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));


Breaking early out of loops

Parallel.ForEach ("Hello, world", (c, loopState) =>
  if (c == ',')
    Console.Write (c);

Optimization with local values - problem

object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
        i => { lock (locker) total += Math.Sqrt (i); });

Optimization with local values - solution

object locker = new object();
double grandTotal = 0;

Parallel.For (1, 10000000,

  () => 0.0,                         // Initialize the local value.
  (i, state, localTotal) =>          // Body delegate. Notice that it
    localTotal + Math.Sqrt (i),    // returns the new local total.
  localTotal =>                                      // Add the local value
    { lock (locker) grandTotal += localTotal; }    // to the master value.


PLINQ version sum

ParallelEnumerable.Range (1, 10000000)
          .Sum (i => Math.Sqrt (i))
Task Parallelism

Creating and starting tasks

// Note: see Chapter 14 for a basic introduction to tasks.

var task = Task.Run (() => Console.WriteLine ("Hello from a task!"));
task.Wait();  // Wait for task to finish

Decoupling task creation and execution

// You can create "cold" (unstarted) tasks with Task's constructor:
var task = new Task (() => Console.Write ("Hello"));

"We can do something else here...".Dump();


Specifying a state object

static void Main()
  var task = Task.Factory.StartNew (Greet, "Hello");

static void Greet (object state) { Console.Write (state); }   // Hello

Putting the state object to better use

static void Main()
  var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
  Console.WriteLine (task.AsyncState);   // Greeting

static void Greet (string message) { Console.WriteLine (message); }

Child tasks

Task parent = Task.Factory.StartNew (() =>
  Console.WriteLine ("I am a parent");
  Task.Factory.StartNew (() =>        // Detached task
    Console.WriteLine ("I am detached");
  Task.Factory.StartNew (() =>        // Child task
    Console.WriteLine ("I am a child");
  }, TaskCreationOptions.AttachedToParent);

Console.WriteLine ("Parent completed");

Exception-handling child tasks

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() => 
  Task.Factory.StartNew (() =>   // Child
    Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
  }, atp);

// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):

Canceling tasks

var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);

Task task = Task.Factory.StartNew (() => 
  Thread.Sleep (1000);
  token.ThrowIfCancellationRequested();  // Check for cancellation request
}, token);

try { task.Wait(); }
catch (AggregateException ex)
  Console.WriteLine (ex.InnerException is TaskCanceledException);  // True
  Console.WriteLine (task.IsCanceled);                             // True
  Console.WriteLine (task.Status);                             // Canceled


Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

Continuations with return values

Task.Factory.StartNew<int> (() => 8)
  .ContinueWith (ant => ant.Result * 2)
  .ContinueWith (ant => Math.Sqrt (ant.Result))
  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

Continuations and exceptions

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));

task2.Wait();  // throws an AggregateException

Continuations - rethrowing antecedent exceptions

Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                .ContinueWith (ant =>
    if (ant.Exception != null) throw ant.Exception;
    // Continue processing...

continuation.Wait();    // Exception is now thrown back to caller.

Continuations - exceptions and TaskContinuationOptions

Task task1 = Task.Factory.StartNew (() => { throw null; });

Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),

Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),


Continuations - extension to swallow exceptions

void Main()
  Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();

static class Extensions
  public static void IgnoreExceptions (this Task task)
    // This could be improved by adding code to log the exception
    task.ContinueWith (t => { var ignore = t.Exception; },

Continuations and child tasks

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
.ContinueWith (p => Console.WriteLine (p.Exception),

.Wait(); // throws AggregateException containing three NullReferenceExceptions

Continuations - conditional

Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));    // This executes

Continuations - conditional (solution)

Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));

Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
                TaskContinuationOptions.NotOnCanceled);  // Does not execute

Continuations with multiple antecedents

var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));

var continuation = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

Continuations with multiple antecedents - collating data

// task1 and task2 would call complex functions in real life:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);

Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));

Console.WriteLine (task3.Result);           // 579

Multiple continuations on a single antecedents - collating data

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));

var c1 = t.ContinueWith (ant => Console.Write ("X"));
var c2 = t.ContinueWith (ant => Console.Write ("Y"));

Task.WaitAll (c1, c2);

Task Schedulers and UIs

void Main()
  new MyWindow().ShowDialog();  

public partial class MyWindow : System.Windows.Window
  Label lblResult = new Label();
  TaskScheduler _uiScheduler;   // Declare this as a field so we can use
                  // it throughout our class.
  public MyWindow()
  protected override void OnActivated (EventArgs e)
    // Get the UI scheduler for the thread that created the form:
    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    Task.Factory.StartNew<string> (SomeComplexWebService)
      .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);    
  void InitializeComponent()
    lblResult.FontSize = 20;
    Content = lblResult;
  string SomeComplexWebService() { Thread.Sleep (1000); return "Foo"; }

Creating your own Task Factories

var factory = new TaskFactory (
  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,

Task task1 = factory.StartNew (() => "foo".Dump());
Task task2 = factory.StartNew (() => "far".Dump());
Working with AggregateException


  var query = from i in ParallelEnumerable.Range (0, 1000000)
        select 100 / i;
  // Enumerate query
catch (AggregateException aex)
  foreach (Exception ex in aex.InnerExceptions)
    Console.WriteLine (ex.Message);


  var query = from i in ParallelEnumerable.Range (0, 1000000)
        select 100 / i;
  // Enumerate query
catch (AggregateException aex)
  foreach (Exception ex in aex.Flatten().InnerExceptions)


var parent = Task.Factory.StartNew (() => 
  // We’ll throw 3 exceptions at once using 3 child tasks:
  int[] numbers = { 0 };
  var childFactory = new TaskFactory
  (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
  childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
  childFactory.StartNew (() => numbers [1]);      // Index out of range
  childFactory.StartNew (() => { throw null; });  // Null reference

try { parent.Wait(); }
catch (AggregateException aex)
  aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
    if (ex is DivideByZeroException)
      Console.WriteLine ("Divide by zero");
      return true;                           // This exception is "handled"
    if (ex is IndexOutOfRangeException)
      Console.WriteLine ("Index out of range");
      return true;                           // This exception is "handled"   
    return false;    // All other exceptions will get rethrown
Concurrent Collections

Producer-Consumer Queue

void Main()
  using (var q = new PCQueue(1))
    q.EnqueueTask (() => "Foo".Dump());
    q.EnqueueTask (() => "Far".Dump());

public class PCQueue : IDisposable
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
  public PCQueue (int workerCount)
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  public void Dispose() { _taskQ.CompleteAdding(); }
  public void EnqueueTask (Action action) { _taskQ.Add (action); }
  void Consume()
    // This sequence that we’re enumerating will block when no elements
    // are available and will end when CompleteAdding is called.
    foreach (Action action in _taskQ.GetConsumingEnumerable())
      action();     // Perform task.

Producer-Consumer Queue - with Tasks

void Main()
  using (var pcQ = new PCQueue(1))
    Task task1 = pcQ.Enqueue (() => Console.WriteLine ("Too"));
    Task task2 = pcQ.Enqueue (() => Console.WriteLine ("Easy!"));
    task1.ContinueWith (_ => "Task 1 complete".Dump());
    task2.ContinueWith (_ => "Task 2 complete".Dump());

public class PCQueue : IDisposable
  BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
  public PCQueue (int workerCount)
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
    Task.Factory.StartNew (Consume);
  public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken))
    var task = new Task (action, cancelToken);
    _taskQ.Add (task);
    return task;
  public Task<TResult> Enqueue<TResult> (Func<TResult> func, 
    CancellationToken cancelToken = default (CancellationToken))
    var task = new Task<TResult> (func, cancelToken);
    _taskQ.Add (task);
    return task;
  void Consume()
    foreach (var task in _taskQ.GetConsumingEnumerable())
      if (!task.IsCanceled) task.RunSynchronously();
    catch (InvalidOperationException) { }  // Race condition
  public void Dispose() { _taskQ.CompleteAdding(); }
EXTRA - Channels

Single Producer - Multiple Consumers

// The consumer is half as fast as the producer. We compensate by starting two consumers.

Channel<string> channel =
  Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
    // Specifying SingleReader and/or SingleWriter 
    // allows the Channel to make optimizing assumptions
    SingleReader = false,
    SingleWriter = true,
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer1 = Consume(1);
var consumer2 = Consume(2);

await Task.WhenAll(consumer1, consumer2);

async Task Produce()
  for (int i = 0; i < 10; i++)
    await channel.Writer.WriteAsync ($"Msg {i}");
    await Task.Delay (1000);
  Console.WriteLine ("Producer done.");

async Task Consume(int id) // We add an ID just to visualize which one processed a given message
  while (await channel.Reader.WaitToReadAsync())
    if (channel.Reader.TryRead (out string data))
      Console.WriteLine ($"Processed on {id}: {data}");
      // Simulate processing takes twice as long as producing
      await Task.Delay (2000);
  Console.WriteLine ($"Consumer {id} done.");

Single Producer - Single Consumer

// The consumer is half as fast as the producer. The producer will finish first.

Channel<string> channel =
  Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
    // Specifying SingleReader and/or SingleWriter 
    // allows the Channel to make optimizing assumptions
    SingleReader = true,
    SingleWriter = true,
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer = Consume();

async Task Produce()
  for (int i = 0; i < 10; i++)
    await channel.Writer.WriteAsync ($"Msg {i}");
    await Task.Delay(1000);
  Console.WriteLine("Producer done.");

async Task Consume()
  while (await channel.Reader.WaitToReadAsync())
    if (channel.Reader.TryRead(out string data))
      Console.WriteLine($"Processed: {data}");
      // Simulate processing takes twice as long as producing
      await Task.Delay(2000);
  Console.WriteLine("Consumer done.");
EXTRA - SpinLock and SpinWait


// See http://www.albahari.com/threading/part5.aspx for the accompanying text on SpinLock and SpinWait

var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
  spinLock.Enter (ref lockTaken);
  // Do stuff...
  if (lockTaken) spinLock.Exit();

SpinWait - SpinUntil

bool _proceed;

void Main()
  var task = Task.Factory.StartNew (Test);
  _proceed = true;

void Test()
  SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });

SpinWait - SpinOnce

bool _proceed;

void Main()
  var task = Task.Run (Test);
  _proceed = true;

void Test()
  var spinWait = new SpinWait();
  while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }

SpinWait - Lock-free updates with CompareExchange

int x = 2;

void Main()
  // We can perform three multiplications on the same variable using 3 concurrent threads
  // safely without locks by using SpinWait with Interlocked.CompareExchange.
  var task1 = Task.Factory.StartNew (() => MultiplyXBy (3));
  var task2 = Task.Factory.StartNew (() => MultiplyXBy (4));
  var task3 = Task.Factory.StartNew (() => MultiplyXBy (5));
  Task.WaitAll (task1, task2, task3);

void MultiplyXBy (int factor)
  var spinWait = new SpinWait();
  while (true)
    int snapshot1 = x;
    int calc = snapshot1 * factor;
    int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
    if (snapshot1 == snapshot2) return;   // No one preempted us.