Chapter 23 - Parallel Programming
PLINQ
Calculating prime numbers
// Calculate prime numbers using a simple (unoptimized) algorithm. // This calculates prime numbers between 3 and a million, using all available cores: IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3); var parallelQuery = from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray(); primes.Take(100).Dump();
Calculating prime numbers - ordered
// Calculate prime numbers with ordered output. IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3); var parallelQuery = from n in numbers.AsParallel().AsOrdered() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray(); primes.Take(100).Dump(); // In this example, we could alternatively call OrderBy at the end of the query.
Parallel spell checker
void Main() { string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt"); if (!File.Exists (wordLookupFile)) // Contains about 150,000 words new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", wordLookupFile); var wordLookup = new HashSet<string> ( File.ReadAllLines (wordLookupFile), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // Introduce a couple wordsToTest [23456] = "wubsie"; // of spelling mistakes. var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); } struct IndexedWord { public string Word; public int Index; }
Parallel spell checker - enhanced
void Main() { string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt"); if (!File.Exists (wordLookupFile)) // Contains about 150,000 words new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", wordLookupFile); var wordLookup = new HashSet<string> ( File.ReadAllLines (wordLookupFile), StringComparer.InvariantCultureIgnoreCase); string[] wordList = wordLookup.ToArray(); // Here, we're using ThreadLocal to generate a thread-safe random number generator, // so we can parallelize the building of the wordsToTest array. var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel() .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // Introduce a couple wordsToTest [23456] = "wubsie"; // of spelling mistakes. var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); } struct IndexedWord { public string Word; public int Index; }
Functional purity
{ int i = 0; (from n in Enumerable.Range(0,999).AsParallel() select n * i++).Dump ("unsafe"); } { Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i).Dump ("safe"); }
Changing degree of parallelism
"The Quick Brown Fox" .AsParallel().WithDegreeOfParallelism (2) .Where (c => !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // Forces Merge + Partition .Select (c => char.ToUpper (c))
Cancellation
IEnumerable<int> million = Enumerable.Range (3, 1000000); var cancelSource = new CancellationTokenSource(); var primeNumberQuery = from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; new Thread (() => { Thread.Sleep (100); // Cancel query after cancelSource.Cancel(); // 100 milliseconds. } ).Start(); try { // Start query running: int[] primes = primeNumberQuery.ToArray(); // We'll never get here because the other thread will cancel us. } catch (OperationCanceledException) { Console.WriteLine ("Query canceled"); }
Output-side optimization
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
Input-side optimization - forcing chunk partitioning
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (n => n % 2 == 0); parallelQuery.Dump();
Optimizing aggregations - simple use of Aggregate
int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate (0, (total, n) => total + n); // 6 sum.Dump();
Optimizing aggregations - seed factory function (contrived)
new[] { 1, 2, 3 }.AsParallel().Aggregate ( () => 0, // seedFactory (localTotal, n) => localTotal + n, // updateAccumulatorFunc (mainTot, localTot) => mainTot + localTot, // combineAccumulatorFunc finalResult => finalResult) // resultSelector
Optimizing aggregations - letter frequencies imperative
string text = "Let’s suppose this is a really long string"; var letterFrequencies = new int[26]; foreach (char c in text) { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; }; letterFrequencies.Dump();
Optimizing aggregations - letter frequencies functional
string text = "Let’s suppose this is a really long string"; int[] result = text.Aggregate ( new int[26], // Create the "accumulator" (letterFrequencies, c) => // Aggregate a letter into the accumulator { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; return letterFrequencies; }); result.Dump();
Optimizing aggregations - letter frequencies parallel
string text = "Let’s suppose this is a really long string"; int[] result = text.AsParallel().Aggregate ( () => new int[26], // Create a new local accumulator (localFrequencies, c) => // Aggregate into the local accumulator { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) localFrequencies [index]++; return localFrequencies; }, // Aggregate local->main accumulator (mainFreq, localFreq) => mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(), finalResult => finalResult // Perform any final transformation ); // on the end result. result.Dump();
The Parallel Class
Parallel.Invoke
Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
Parallel.For
var keyPairs = new string[6]; Parallel.For (0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString (true)); keyPairs.Dump();
PLINQ version
ParallelEnumerable.Range (0, 6) .Select (i => RSA.Create().ToXmlString (true)) .ToArray()
Parallel.Foreach - indexed
Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); });
Parallel Spellchecker with TPL
string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt"); if (!File.Exists (wordLookupFile)) // Contains about 150,000 words new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", wordLookupFile); var wordLookup = new HashSet<string> ( File.ReadAllLines (wordLookupFile), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // Introduce a couple wordsToTest [23456] = "wubsie"; // of spelling mistakes. var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); misspellings.Dump();
Breaking early out of loops
Parallel.ForEach ("Hello, world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write (c); });
Optimization with local values - problem
object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); }); total.Dump();
Optimization with local values - solution
object locker = new object(); double grandTotal = 0; Parallel.For (1, 10000000, () => 0.0, // Initialize the local value. (i, state, localTotal) => // Body delegate. Notice that it localTotal + Math.Sqrt (i), // returns the new local total. localTotal => // Add the local value { lock (locker) grandTotal += localTotal; } // to the master value. ); grandTotal.Dump();
PLINQ version sum
ParallelEnumerable.Range (1, 10000000) .Sum (i => Math.Sqrt (i))
Task Parallelism
Creating and starting tasks
// Note: see Chapter 14 for a basic introduction to tasks. var task = Task.Run (() => Console.WriteLine ("Hello from a task!")); task.Wait(); // Wait for task to finish
Decoupling task creation and execution
// You can create "cold" (unstarted) tasks with Task's constructor: var task = new Task (() => Console.Write ("Hello")); "We can do something else here...".Dump(); task.Start();
Specifying a state object
static void Main() { var task = Task.Factory.StartNew (Greet, "Hello"); } static void Greet (object state) { Console.Write (state); } // Hello
Putting the state object to better use
static void Main() { var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting"); Console.WriteLine (task.AsyncState); // Greeting } static void Greet (string message) { Console.WriteLine (message); }
Child tasks
Task parent = Task.Factory.StartNew (() => { Console.WriteLine ("I am a parent"); Task.Factory.StartNew (() => // Detached task { Console.WriteLine ("I am detached"); }); Task.Factory.StartNew (() => // Child task { Console.WriteLine ("I am a child"); }, TaskCreationOptions.AttachedToParent); }); parent.Wait(); Console.WriteLine ("Parent completed");
Exception-handling child tasks
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; var parent = Task.Factory.StartNew (() => { Task.Factory.StartNew (() => // Child { Task.Factory.StartNew (() => { throw null; }, atp); // Grandchild }, atp); }); // The following call throws a NullReferenceException (wrapped // in nested AggregateExceptions): parent.Wait();
Canceling tasks
var cts = new CancellationTokenSource(); CancellationToken token = cts.Token; cts.CancelAfter (500); Task task = Task.Factory.StartNew (() => { Thread.Sleep (1000); token.ThrowIfCancellationRequested(); // Check for cancellation request }, token); try { task.Wait(); } catch (AggregateException ex) { Console.WriteLine (ex.InnerException is TaskCanceledException); // True Console.WriteLine (task.IsCanceled); // True Console.WriteLine (task.Status); // Canceled }
Continuations
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant..")); Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
Continuations with return values
Task.Factory.StartNew<int> (() => 8) .ContinueWith (ant => ant.Result * 2) .ContinueWith (ant => Math.Sqrt (ant.Result)) .ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
Continuations and exceptions
Task task1 = Task.Factory.StartNew (() => { throw null; }); Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception)); task2.Wait(); // throws an AggregateException
Continuations - rethrowing antecedent exceptions
Task continuation = Task.Factory.StartNew (() => { throw null; }) .ContinueWith (ant => { if (ant.Exception != null) throw ant.Exception; // Continue processing... }); continuation.Wait(); // Exception is now thrown back to caller.
Continuations - exceptions and TaskContinuationOptions
Task task1 = Task.Factory.StartNew (() => { throw null; }); Task error = task1.ContinueWith (ant => Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok = task1.ContinueWith (ant => Console.Write ("Success!"), TaskContinuationOptions.NotOnFaulted); error.Wait();
Continuations - extension to swallow exceptions
void Main() { Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions(); } static class Extensions { public static void IgnoreExceptions (this Task task) { // This could be improved by adding code to log the exception task.ContinueWith (t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); } }
Continuations and child tasks
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() => { Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); }) .ContinueWith (p => Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted) .Wait(); // throws AggregateException containing three NullReferenceExceptions
Continuations - conditional
Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here")); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3")); // This executes
Continuations - conditional (solution)
Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here")); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"), TaskContinuationOptions.NotOnCanceled); // Does not execute
Continuations with multiple antecedents
var task1 = Task.Factory.StartNew (() => Console.Write ("X")); var task2 = Task.Factory.StartNew (() => Console.Write ("Y")); var continuation = Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
Continuations with multiple antecedents - collating data
// task1 and task2 would call complex functions in real life: Task<int> task1 = Task.Factory.StartNew (() => 123); Task<int> task2 = Task.Factory.StartNew (() => 456); Task<int> task3 = Task<int>.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result)); Console.WriteLine (task3.Result); // 579
Multiple continuations on a single antecedents - collating data
var t = Task.Factory.StartNew (() => Thread.Sleep (1000)); var c1 = t.ContinueWith (ant => Console.Write ("X")); var c2 = t.ContinueWith (ant => Console.Write ("Y")); Task.WaitAll (c1, c2);
Task Schedulers and UIs
void Main() { new MyWindow().ShowDialog(); } public partial class MyWindow : System.Windows.Window { Label lblResult = new Label(); TaskScheduler _uiScheduler; // Declare this as a field so we can use // it throughout our class. public MyWindow() { InitializeComponent(); } protected override void OnActivated (EventArgs e) { // Get the UI scheduler for the thread that created the form: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew<string> (SomeComplexWebService) .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler); } void InitializeComponent() { lblResult.FontSize = 20; Content = lblResult; } string SomeComplexWebService() { Thread.Sleep (1000); return "Foo"; } }
Creating your own Task Factories
var factory = new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); Task task1 = factory.StartNew (() => "foo".Dump()); Task task2 = factory.StartNew (() => "far".Dump());
Working with AggregateException
AggregateException
try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // Enumerate query query.Dump(); } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); }
Flatten
try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // Enumerate query query.Dump(); } catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) ex.Dump(); }
Handle
var parent = Task.Factory.StartNew (() => { // We’ll throw 3 exceptions at once using 3 child tasks: int[] numbers = { 0 }; var childFactory = new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() => 5 / numbers[0]); // Division by zero childFactory.StartNew (() => numbers [1]); // Index out of range childFactory.StartNew (() => { throw null; }); // Null reference }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex => // Note that we still need to call Flatten { if (ex is DivideByZeroException) { Console.WriteLine ("Divide by zero"); return true; // This exception is "handled" } if (ex is IndexOutOfRangeException) { Console.WriteLine ("Index out of range"); return true; // This exception is "handled" } return false; // All other exceptions will get rethrown }); }
Concurrent Collections
Producer-Consumer Queue
void Main() { using (var q = new PCQueue(1)) { q.EnqueueTask (() => "Foo".Dump()); q.EnqueueTask (() => "Far".Dump()); } } public class PCQueue : IDisposable { BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public PCQueue (int workerCount) { // Create and start a separate Task for each consumer: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // This sequence that we’re enumerating will block when no elements // are available and will end when CompleteAdding is called. foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // Perform task. } }
Producer-Consumer Queue - with Tasks
void Main() { using (var pcQ = new PCQueue(1)) { Task task1 = pcQ.Enqueue (() => Console.WriteLine ("Too")); Task task2 = pcQ.Enqueue (() => Console.WriteLine ("Easy!")); task1.ContinueWith (_ => "Task 1 complete".Dump()); task2.ContinueWith (_ => "Task 2 complete".Dump()); } } public class PCQueue : IDisposable { BlockingCollection<Task> _taskQ = new BlockingCollection<Task>(); public PCQueue (int workerCount) { // Create and start a separate Task for each consumer: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken)) { var task = new Task (action, cancelToken); _taskQ.Add (task); return task; } public Task<TResult> Enqueue<TResult> (Func<TResult> func, CancellationToken cancelToken = default (CancellationToken)) { var task = new Task<TResult> (func, cancelToken); _taskQ.Add (task); return task; } void Consume() { foreach (var task in _taskQ.GetConsumingEnumerable()) try { if (!task.IsCanceled) task.RunSynchronously(); } catch (InvalidOperationException) { } // Race condition } public void Dispose() { _taskQ.CompleteAdding(); } }
EXTRA - Channels
Single Producer - Multiple Consumers
// The consumer is half as fast as the producer. We compensate by starting two consumers. Channel<string> channel = Channel.CreateBounded<string> (new BoundedChannelOptions (1000) { // Specifying SingleReader and/or SingleWriter // allows the Channel to make optimizing assumptions SingleReader = false, SingleWriter = true, }); var producer = Produce().ContinueWith (_ => channel.Writer.Complete()); var consumer1 = Consume(1); var consumer2 = Consume(2); await Task.WhenAll(consumer1, consumer2); async Task Produce() { for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync ($"Msg {i}"); await Task.Delay (1000); } Console.WriteLine ("Producer done."); } async Task Consume(int id) // We add an ID just to visualize which one processed a given message { while (await channel.Reader.WaitToReadAsync()) { if (channel.Reader.TryRead (out string data)) { Console.WriteLine ($"Processed on {id}: {data}"); // Simulate processing takes twice as long as producing await Task.Delay (2000); } } Console.WriteLine ($"Consumer {id} done."); }
Single Producer - Single Consumer
// The consumer is half as fast as the producer. The producer will finish first. Channel<string> channel = Channel.CreateBounded<string> (new BoundedChannelOptions (1000) { // Specifying SingleReader and/or SingleWriter // allows the Channel to make optimizing assumptions SingleReader = true, SingleWriter = true, }); var producer = Produce().ContinueWith (_ => channel.Writer.Complete()); var consumer = Consume(); async Task Produce() { for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync ($"Msg {i}"); await Task.Delay(1000); } Console.WriteLine("Producer done."); } async Task Consume() { while (await channel.Reader.WaitToReadAsync()) { if (channel.Reader.TryRead(out string data)) { Console.WriteLine($"Processed: {data}"); // Simulate processing takes twice as long as producing await Task.Delay(2000); } } Console.WriteLine("Consumer done."); }
EXTRA - SpinLock and SpinWait
SpinLock
// See http://www.albahari.com/threading/part5.aspx for the accompanying text on SpinLock and SpinWait var spinLock = new SpinLock (true); // Enable owner tracking bool lockTaken = false; try { spinLock.Enter (ref lockTaken); // Do stuff... } finally { if (lockTaken) spinLock.Exit(); }
SpinWait - SpinUntil
bool _proceed; void Main() { var task = Task.Factory.StartNew (Test); Thread.Sleep(1000); _proceed = true; task.Wait(); } void Test() { SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; }); "Done!".Dump(); }
SpinWait - SpinOnce
bool _proceed; void Main() { var task = Task.Run (Test); Thread.Sleep(1000); _proceed = true; task.Wait(); } void Test() { var spinWait = new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } "Done!".Dump(); }
SpinWait - Lock-free updates with CompareExchange
int x = 2; void Main() { // We can perform three multiplications on the same variable using 3 concurrent threads // safely without locks by using SpinWait with Interlocked.CompareExchange. var task1 = Task.Factory.StartNew (() => MultiplyXBy (3)); var task2 = Task.Factory.StartNew (() => MultiplyXBy (4)); var task3 = Task.Factory.StartNew (() => MultiplyXBy (5)); Task.WaitAll (task1, task2, task3); x.Dump(); } void MultiplyXBy (int factor) { var spinWait = new SpinWait(); while (true) { int snapshot1 = x; Thread.MemoryBarrier(); int calc = snapshot1 * factor; int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 == snapshot2) return; // No one preempted us. spinWait.SpinOnce(); } }