Chapter 14 - Concurrency and Asynchrony
Threading Basics
Creating a thread
static void Main() { Thread t = new Thread (WriteY); // Kick off a new thread t.Start(); // running WriteY() // Simultaneously, do something on the main thread. for (int i = 0; i < 1000; i++) Console.Write ("x"); } static void WriteY() { for (int i = 0; i < 1000; i++) Console.Write ("y"); }
Join
static void Main() { Thread t = new Thread (Go); t.Start(); t.Join(); Console.WriteLine ("Thread t has ended!"); } static void Go() { for (int i = 0; i < 1000; i++) Console.Write ("y"); }
Sleep
Thread.Sleep (500); // Sleep the current thread for 500 milliseconds
Local state
static void Main() { new Thread (Go).Start(); // Call Go() on a new thread Go(); // Call Go() on the main thread } static void Go() { // Declare and use a local variable - 'cycles' for (int cycles = 0; cycles < 5; cycles++) Console.Write ('?'); }
Shared state - unsafe
void Main() { ThreadTest.Main(); } class ThreadTest { bool _done; public static void Main() { ThreadTest tt = new ThreadTest(); // Create a common instance new Thread (tt.Go).Start(); tt.Go(); } void Go() // Note that this is an instance method { if (!_done) { _done = true; Console.WriteLine ("Done"); } } }
Shared state with closure - unsafe
bool done = false; ThreadStart action = () => { if (!done) { done = true; Console.WriteLine ("Done"); } }; new Thread (action).Start(); action();
Shared state with statics - unsafe
void Main() { ThreadTest.Main(); } class ThreadTest { static bool _done; // Static fields are shared between all threads // in the same application domain. public static void Main() { new Thread (Go).Start(); Go(); } static void Go() { if (!_done) { _done = true; Console.WriteLine ("Done"); } } }
Shared state - safe
void Main() { ThreadSafe.Main(); } class ThreadSafe { static bool _done; static readonly object _locker = new object(); public static void Main() { new Thread (Go).Start(); Go(); } static void Go() { lock (_locker) { if (!_done) { Console.WriteLine ("Done"); _done = true; } } } }
Passing in data with a lambda expression
static void Main() { Thread t = new Thread ( () => Print ("Hello from t!") ); t.Start(); } static void Print (string message) { Console.WriteLine (message); }
Multi-statement lambda
new Thread (() => { Console.WriteLine ("I'm running on another thread!"); Console.WriteLine ("This is so easy!"); }).Start();
Lambdas and captured variables - unsafe
for (int i = 0; i < 10; i++) new Thread (() => Console.Write (i)).Start();
Lambdas and captured variables - safe
for (int i = 0; i < 10; i++) { int temp = i; new Thread (() => Console.Write (temp)).Start(); }
Exception handling - wrong place
static void Main() { try { new Thread (Go).Start(); } catch (Exception ex) { // We'll never get here! Console.WriteLine ("Exception!"); } } static void Go() { throw null; } // Throws a NullReferenceException
Exception handling - right place
static void Main() { new Thread (Go).Start(); } static void Go() { try { throw null; // The NullReferenceException will get caught below } catch (Exception ex) { //Typically log the exception, and/or signal another thread // that we've come unstuck ex.Dump ("Caught!"); } }
Basic signaling
var signal = new ManualResetEvent (false); new Thread (() => { Console.WriteLine ("Waiting for signal..."); signal.WaitOne(); signal.Dispose(); Console.WriteLine ("Got signal!"); }).Start(); Thread.Sleep(2000); signal.Set(); // “Open” the signal
Threading in rich-client apps
void Main() { new MyWindow().ShowDialog(); } partial class MyWindow : Window { TextBox txtMessage; public MyWindow() { InitializeComponent(); new Thread (Work).Start(); } void Work() { Thread.Sleep (5000); // Simulate time-consuming task UpdateMessage ("The answer"); } void UpdateMessage (string message) { Action action = () => txtMessage.Text = message; Dispatcher.BeginInvoke (action); } void InitializeComponent() { SizeToContent = SizeToContent.WidthAndHeight; WindowStartupLocation = WindowStartupLocation.CenterScreen; Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" }; } }
Synchronization contexts
void Main() { Util.CreateSynchronizationContext(); new MyWindow().ShowDialog(); } partial class MyWindow : Window { TextBox txtMessage; SynchronizationContext _uiSyncContext; public MyWindow() { InitializeComponent(); // Capture the synchronization context for the current UI thread: _uiSyncContext = SynchronizationContext.Current; new Thread (Work).Start(); } void Work() { Thread.Sleep (5000); // Simulate time-consuming task UpdateMessage ("The answer"); } void UpdateMessage (string message) { // Marshal the delegate to the UI thread: _uiSyncContext.Post (_ => txtMessage.Text = message, null); } void InitializeComponent() { SizeToContent = SizeToContent.WidthAndHeight; WindowStartupLocation = WindowStartupLocation.CenterScreen; Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" }; } }
Entering the ThreadPool
// Task is in System.Threading.Tasks Task.Run (() => Console.WriteLine ("Hello from the thread pool")); // The old-school way: ThreadPool.QueueUserWorkItem (notUsed => Console.WriteLine ("Hello, old-school"));
Tasks
Starting a Task
Task.Run (() => Console.WriteLine ("Foo"));
Wait
Task task = Task.Run (() => { Console.WriteLine ("Task started"); Thread.Sleep (2000); Console.WriteLine ("Foo"); }); Console.WriteLine (task.IsCompleted); // False task.Wait(); // Blocks until task is complete
Long-running task
Task task = Task.Factory.StartNew (() => { Console.WriteLine ("Task started"); Thread.Sleep (2000); Console.WriteLine ("Foo"); }, TaskCreationOptions.LongRunning); task.Wait(); // Blocks until task is complete
Returning a value
Task<int> task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; }); int result = task.Result; // Blocks if not already finished Console.WriteLine (result); // 3
Count prime numbers
Task<int> primeNumberTask = Task.Run (() => Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0))); Console.WriteLine ("Task running..."); Console.WriteLine ("The answer is " + primeNumberTask.Result);
Exceptions
// Start a Task that throws a NullReferenceException: Task task = Task.Run (() => { throw null; }); try { task.Wait(); } catch (AggregateException aex) { if (aex.InnerException is NullReferenceException) Console.WriteLine ("Null!"); else throw; }
Continuations - GetAwaiter
Task<int> primeNumberTask = Task.Run (() => Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0))); var awaiter = primeNumberTask.GetAwaiter(); awaiter.OnCompleted (() => { int result = awaiter.GetResult(); Console.WriteLine (result); // Writes result });
Continuations - ContinueWith
// (See Chapter 23 for more on using ContinueWith.) Task<int> primeNumberTask = Task.Run (() => Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0))); primeNumberTask.ContinueWith (antecedent => { int result = antecedent.Result; Console.WriteLine (result); // Writes 123 });
TaskCompletionSource - Print 42 after 5 seconds
var tcs = new TaskCompletionSource<int>(); new Thread (() => { Thread.Sleep (5000); tcs.SetResult (42); }).Start(); Task<int> task = tcs.Task; // Our "slave" task. Console.WriteLine (task.Result); // 42
TaskCompletionSource - Our own Run method
void Main() { Task<int> task = Run (() => { Thread.Sleep (5000); return 42; }); task.Result.Dump(); } Task<TResult> Run<TResult> (Func<TResult> function) { var tcs = new TaskCompletionSource<TResult>(); new Thread (() => { try { tcs.SetResult (function()); } catch (Exception ex) { tcs.SetException (ex); } }).Start(); return tcs.Task; }
TaskCompletionSource - GetAnswerToLife
void Main() { var awaiter = GetAnswerToLife().GetAwaiter(); awaiter.OnCompleted (() => Console.WriteLine (awaiter.GetResult())); } Task<int> GetAnswerToLife() { var tcs = new TaskCompletionSource<int>(); // Create a timer that fires once in 5000 ms: var timer = new System.Timers.Timer (5000) { AutoReset = false }; timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (42); }; timer.Start(); return tcs.Task; }
Writing Delay method
void Main() { Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42)); } Task Delay (int milliseconds) { var tcs = new TaskCompletionSource<object>(); var timer = new System.Timers.Timer (milliseconds) { AutoReset = false }; timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); }; timer.Start(); return tcs.Task; }
Delay times 10000
void Main() { for (int i = 0; i < 10000; i++) Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42)); } Task Delay (int milliseconds) { var tcs = new TaskCompletionSource<object>(); var timer = new System.Timers.Timer (milliseconds) { AutoReset = false }; timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); }; timer.Start(); return tcs.Task; }
Task.Delay
Task.Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42)); // Another way to attach a continuation: Task.Delay (5000).ContinueWith (ant => Console.WriteLine (42));
ValueTask
async Task Main(string[] args) { var vt1 = AnswerQuestionAsync ("What's the answer to life?"); var vt2 = AnswerQuestionAsync ("Is the sun shining?"); Console.WriteLine ($"vt1.IsCompleted: {vt1.IsCompleted}"); // True Console.WriteLine ($"vt2.IsCompleted: {vt2.IsCompleted}"); // False var a1 = await vt1; Console.WriteLine ($"a1: {a1}"); // Immediate var a2 = await vt2; Console.WriteLine ($"a2: {a2}"); // Takes 5 seconds to appear } async ValueTask<string> AnswerQuestionAsync (string question) { if (question == "What's the answer to life?") return "42"; // ValueTask<string> return await AskCortanaAsync(question); // ValueTask<Task<string>> } async Task<string> AskCortanaAsync (string question) { await Task.Delay(5000); return "I don't know."; }
Principles of Asynchrony
GetPrimesCount
void Main() { DisplayPrimeCounts(); } void DisplayPrimeCounts() { for (int i = 0; i < 10; i++) Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1)); Console.WriteLine ("Done!"); } int GetPrimesCount (int start, int count) { return ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)); }
Course-grained asynchrony
void Main() { Task.Run (() => DisplayPrimeCounts()); } void DisplayPrimeCounts() { for (int i = 0; i < 10; i++) Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1)); Console.WriteLine ("Done!"); } int GetPrimesCount (int start, int count) { return ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)); }
Fine-grained asynchrony
void Main() { DisplayPrimeCounts(); } void DisplayPrimeCounts() { DisplayPrimeCountsFrom (0); } void DisplayPrimeCountsFrom (int i) // This is starting to get awkward! { var awaiter = GetPrimesCountAsync (i*1000000 + 2, 1000000).GetAwaiter(); awaiter.OnCompleted (() => { Console.WriteLine (awaiter.GetResult() + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1)); if (i++ < 10) DisplayPrimeCountsFrom (i); else Console.WriteLine ("Done"); }); } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0))); }
Making DisplayPrimesCount asynchronous
void Main() { DisplayPrimeCountsAsync(); } Task DisplayPrimeCountsAsync() { var machine = new PrimesStateMachine(); machine.DisplayPrimeCountsFrom (0); return machine.Task; } class PrimesStateMachine // Even more awkward!! { TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>(); public Task Task { get { return _tcs.Task; } } public void DisplayPrimeCountsFrom (int i) { var awaiter = GetPrimesCountAsync (i*1000000+2, 1000000).GetAwaiter(); awaiter.OnCompleted (() => { Console.WriteLine (awaiter.GetResult()); if (i++ < 10) DisplayPrimeCountsFrom (i); else { Console.WriteLine ("Done"); _tcs.SetResult (null); } }); } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0))); } }
Asynchronous functions to the rescue
void Main() { DisplayPrimeCountsAsync(); } async Task DisplayPrimeCountsAsync() { for (int i = 0; i < 10; i++) Console.WriteLine (await GetPrimesCountAsync (i*1000000 + 2, 1000000) + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1)); Console.WriteLine ("Done!"); } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0))); }
Asynchronous Functions in C# 5.0
Awaiting
void Main() { DisplayPrimesCount(); } async void DisplayPrimesCount() { int result = await GetPrimesCountAsync (2, 1000000); Console.WriteLine (result); } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0))); }
Capturing local state
void Main() { DisplayPrimeCounts(); } async void DisplayPrimeCounts() { for (int i = 0; i < 10; i++) Console.WriteLine (await GetPrimesCountAsync (i*1000000+2, 1000000)); } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0))); }
Awaiting in a UI - synchronous
void Main() { new TestUI().ShowDialog(); } class TestUI : Window // Notice how the window becomes unresponsive while working { Button _button = new Button { Content = "Go" }; TextBlock _results = new TextBlock(); public TestUI() { var panel = new StackPanel(); panel.Children.Add (_button); panel.Children.Add (_results); Content = panel; _button.Click += (sender, args) => Go(); } void Go() { for (int i = 1; i < 5; i++) _results.Text += GetPrimesCount (i * 1000000, 1000000) + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine; } int GetPrimesCount (int start, int count) { return ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)); } }
Awaiting in a UI - asynchronous
void Main() { new TestUI().ShowDialog(); } class TestUI : Window // Notice how the window becomes unresponsive while working { Button _button = new Button { Content = "Go" }; TextBlock _results = new TextBlock(); public TestUI() { var panel = new StackPanel(); panel.Children.Add (_button); panel.Children.Add (_results); Content = panel; _button.Click += (sender, args) => Go(); } async void Go() { _button.IsEnabled = false; for (int i = 1; i < 5; i++) _results.Text += await GetPrimesCountAsync (i * 1000000, 1000000) + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine; _button.IsEnabled = true; } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0))); } }
Awaiting in a UI - IO-bound
void Main() { new TestUI().ShowDialog(); } class TestUI : Window // Notice how the window becomes unresponsive while working { Button _button = new Button { Content = "Go" }; TextBlock _results = new TextBlock(); public TestUI() { var panel = new StackPanel(); panel.Children.Add (_button); panel.Children.Add (_results); Content = panel; _button.Click += (sender, args) => Go(); } async void Go() { _button.IsEnabled = false; string[] urls = "www.albahari.com www.oreilly.com www.linqpad.net".Split(); int totalLength = 0; try { foreach (string url in urls) { var uri = new Uri ("http://" + url); byte[] data = await new WebClient().DownloadDataTaskAsync (uri); _results.Text += "Length of " + url + " is " + data.Length + Environment.NewLine; totalLength += data.Length; } _results.Text += "Total length: " + totalLength; } catch (WebException ex) { _results.Text += "Error: " + ex.Message; } finally { _button.IsEnabled = true; } } Task<int> GetPrimesCountAsync (int start, int count) { return Task.Run (() => ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0))); } }
Awaiting in a UI - Comparison to course-grained concurrency
void Main() { new TestUI().ShowDialog(); } class TestUI : Window // Notice how the window becomes unresponsive while working { Button _button = new Button { Content = "Go" }; TextBlock _results = new TextBlock(); public TestUI() { var panel = new StackPanel(); panel.Children.Add (_button); panel.Children.Add (_results); Content = panel; _button.Click += (sender, args) => { _button.IsEnabled = false; Task.Run (() => Go()); }; } void Go() { // Notice the race condition (run it and look at what's wrong with the results): for (int i = 1; i < 5; i++) { int result = GetPrimesCount (i * 1000000, 1000000); Dispatcher.BeginInvoke (new Action (() => _results.Text += result + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine)); } Dispatcher.BeginInvoke (new Action (() => _button.IsEnabled = true)); } int GetPrimesCount (int start, int count) { return ParallelEnumerable.Range (start, count).Count (n => Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)); } }
Writing asynchronous functions
void Main() { Go(); } async Task Go() { await PrintAnswerToLife(); Console.WriteLine ("Done"); } async Task PrintAnswerToLife() // We can return Task instead of void { await Task.Delay (5000); int answer = 21 * 2; Console.WriteLine (answer); }
Returning Task of TResult
void Main() { Go(); } async Task Go() { await PrintAnswerToLife(); Console.WriteLine ("Done"); } async Task PrintAnswerToLife() { int answer = await GetAnswerToLife(); Console.WriteLine (answer); } async Task<int> GetAnswerToLife() { await Task.Delay (5000); int answer = 21 * 2; return answer; }
Blocking versions of the above
void Main() { Go(); } void Go() { PrintAnswerToLife(); Console.WriteLine ("Done"); } void PrintAnswerToLife() { int answer = GetAnswerToLife(); Console.WriteLine (answer); } int GetAnswerToLife() { Thread.Sleep (5000); int answer = 21 * 2; return answer; }
Parallelism
void Main() { Go(); } async Task Go() { var task1 = PrintAnswerToLife(); var task2 = PrintAnswerToLife(); await task1; await task2; Console.WriteLine ("Done"); } async Task PrintAnswerToLife() { int answer = await GetAnswerToLife(); Console.WriteLine (answer); } async Task<int> GetAnswerToLife() { await Task.Delay (5000); int answer = 21 * 2; return answer; }
Asynchronous lambda expressions
// Named asynchronous method: async Task NamedMethod() { await Task.Delay (1000); Console.WriteLine ("Foo"); } async void Main() { // Unnamed asynchronous method: Func<Task> unnamed = async () => { await Task.Delay (1000); Console.WriteLine ("Foo"); }; // We can call the two in the same way: await NamedMethod(); await unnamed(); }
Asynchronous lambda expressions - event handlers
var myButton = new Button { Height = 30, Content = "Wait..." }; myButton.Click += async (sender, args) => { await Task.Delay (1000); myButton.Content = "Done"; }; myButton.Dump();
Asynchronous lambda expressions - returning Task of TResult
Func<Task<int>> unnamed = async () => { await Task.Delay (1000); return 123; }; int answer = await unnamed(); answer.Dump();
Optimizations - Completing synchronously
async void Main() { string html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); // Let's try again. It should be instant this time: html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); } static Dictionary<string,string> _cache = new Dictionary<string,string>(); async Task<string> GetWebPageAsync (string uri) { string html; if (_cache.TryGetValue (uri, out html)) return html; return _cache [uri] = await new WebClient().DownloadStringTaskAsync (uri); }
Optimizations - Caching Tasks
async void Main() { string html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); // Let's try again. It should be instant this time: html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); } static Dictionary<string,Task<string>> _cache = new Dictionary<string,Task<string>>(); Task<string> GetWebPageAsync (string uri) { Task<string> downloadTask; if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask; return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri); }
Optimizations - Caching Tasks fully threadsafe
async void Main() { string html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); // Let's try again. It should be instant this time: html = await GetWebPageAsync ("http://www.linqpad.net"); html.Length.Dump ("Characters downloaded"); } static Dictionary<string,Task<string>> _cache = new Dictionary<string,Task<string>>(); Task<string> GetWebPageAsync (string uri) { lock (_cache) { Task<string> downloadTask; if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask; return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri); } }
Optimizations - Avoiding excessive bouncing
void Main() { A(); } async void A() { await B(); } async Task B() { for (int i = 0; i < 1000; i++) await C().ConfigureAwait (false); } async Task C() { /*...*/ }
Asynchronous Streams in C# 8.0
Asynchronous Stream vs Task of IEnumerable
Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives in one group."); foreach (var data in await RangeTaskAsync(0, 10, 500)) Console.WriteLine (data); Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives as available."); await foreach (var number in RangeAsync (0, 10, 500)) Console.WriteLine (number); static async Task<IEnumerable<int>> RangeTaskAsync(int start, int count, int delay) { List<int> data = new List<int>(); for (int i = start; i < start + count; i++) { await Task.Delay (delay); data.Add (i); } return data; } async IAsyncEnumerable<int> RangeAsync ( int start, int count, int delay) { for (int i = start; i < start + count; i++) { await Task.Delay (delay); yield return i; } }
Asynchronous Streams and LINQ
async Task Main() { IAsyncEnumerable<int> query = from i in RangeAsync (0, 10, 500) where i % 2 == 0 // Even numbers only. select i * 10; // Multiply by 10. await foreach (var number in query) Console.WriteLine (number); query.Dump(); // in LINQPad, you can directly dump IAsyncEnumerable<T> } async IAsyncEnumerable<int> RangeAsync ( int start, int count, int delay) { for (int i = start; i < start + count; i++) { await Task.Delay (delay); yield return i; } }
Asynchronous Patterns
Cancellation
async void Main() { var token = new CancellationToken(); Task.Delay (5000).ContinueWith (ant => token.Cancel()); // Tell it to cancel in two seconds. await Foo (token); } // This is a simplified version of the CancellationToken type in System.Threading: class CancellationToken { public bool IsCancellationRequested { get; private set; } public void Cancel() { IsCancellationRequested = true; } public void ThrowIfCancellationRequested() { if (IsCancellationRequested) throw new OperationCanceledException(); } } async Task Foo (CancellationToken cancellationToken) { for (int i = 0; i < 10; i++) { Console.WriteLine (i); await Task.Delay (1000); cancellationToken.ThrowIfCancellationRequested(); } }
Using the real CancellationToken
async void Main() { var cancelSource = new CancellationTokenSource(); Task.Delay (5000).ContinueWith (ant => cancelSource.Cancel()); // Tell it to cancel in two seconds. await Foo (cancelSource.Token); } async Task Foo (CancellationToken cancellationToken) { for (int i = 0; i < 10; i++) { Console.WriteLine (i); await Task.Delay (1000); cancellationToken.ThrowIfCancellationRequested(); } }
Using the real CancellationToken - improved version
async void Main() { var cancelSource = new CancellationTokenSource (5000); // This tells it to cancel in 5 seconds await Foo (cancelSource.Token); } async Task Foo (CancellationToken cancellationToken) { for (int i = 0; i < 10; i++) { Console.WriteLine (i); await Task.Delay (1000, cancellationToken); // Cancellation tokens propagate nicely } }
Progress reporting - with a delegate
async void Main() { Action<int> progress = i => Console.WriteLine (i + " %"); await Foo (progress); } Task Foo (Action<int> onProgressPercentChanged) { return Task.Run (() => { for (int i = 0; i < 1000; i++) { if (i % 10 == 0) onProgressPercentChanged (i / 10); // Do something compute-bound... } }); }
Progress reporting - with IProgress
async void Main() { Action<int> progress = i => Console.WriteLine (i + " %"); await Foo (progress); } Task Foo (Action<int> onProgressPercentChanged) { return Task.Run (() => { for (int i = 0; i < 1000; i++) { if (i % 10 == 0) onProgressPercentChanged (i / 10); // Do something compute-bound... } }); }
Task combinators - WhenAny
async void Main() { Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3()); Console.WriteLine ("Done"); Console.WriteLine (winningTask.Result); // 1 } async Task<int> Delay1() { await Task.Delay (1000); return 1; } async Task<int> Delay2() { await Task.Delay (2000); return 2; } async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - await winning task
async void Main() { Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3()); Console.WriteLine ("Done"); Console.WriteLine (await winningTask); // 1 } async Task<int> Delay1() { await Task.Delay (1000); return 1; } async Task<int> Delay2() { await Task.Delay (2000); return 2; } async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - in one step
async void Main() { int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3()); answer.Dump(); } async Task<int> Delay1() { await Task.Delay (1000); return 1; } async Task<int> Delay2() { await Task.Delay (2000); return 2; } async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - timeouts
async void Main() { Task<string> task = SomeAsyncFunc(); Task winner = await (Task.WhenAny (task, Task.Delay(5000))); if (winner != task) throw new TimeoutException(); string result = await task; // Unwrap result/re-throw } async Task<string> SomeAsyncFunc() { await Task.Delay (10000); return "foo"; }
Task combinators - WhenAll
async void Main() { await Task.WhenAll (Delay1(), Delay2(), Delay3()); "Done".Dump(); } async Task<int> Delay1() { await Task.Delay (1000); return 1; } async Task<int> Delay2() { await Task.Delay (2000); return 2; } async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAll - exceptions
async void Main() { Task task1 = Task.Run (() => { throw null; } ); Task task2 = Task.Run (() => { throw null; } ); Task all = Task.WhenAll (task1, task2); try { await all; } catch { Console.WriteLine (all.Exception.InnerExceptions.Count); // 2 } }
Task combinators - WhenAll - return values
async void Main() { Task<int> task1 = Task.Run (() => 1); Task<int> task2 = Task.Run (() => 2); int[] results = await Task.WhenAll (task1, task2); // { 1, 2 } results.Dump(); }
Task combinators - WhenAll - web page downloads
async void Main() { int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split()); totalSize.Dump(); } async Task<int> GetTotalSize (string[] uris) { IEnumerable<Task<byte[]>> downloadTasks = uris.Select (uri => new WebClient().DownloadDataTaskAsync (uri)); byte[][] contents = await Task.WhenAll (downloadTasks); return contents.Sum (c => c.Length); }
Task combinators - WhenAll - web page downloads improved
async void Main() { int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split()); totalSize.Dump(); } async Task<int> GetTotalSize (string[] uris) { IEnumerable<Task<int>> downloadTasks = uris.Select (async uri => (await new WebClient().DownloadDataTaskAsync (uri)).Length); int[] contentLengths = await Task.WhenAll (downloadTasks); return contentLengths.Sum(); }
Custom combinators - WithTimeout
async void Main() { string result = await SomeAsyncFunc().WithTimeout (TimeSpan.FromSeconds (2)); result.Dump(); } async Task<string> SomeAsyncFunc() { await Task.Delay (10000); return "foo"; } public static class Extensions { public async static Task<TResult> WithTimeout<TResult> (this Task<TResult> task, TimeSpan timeout) { Task winner = await (Task.WhenAny (task, Task.Delay (timeout))); if (winner != task) throw new TimeoutException(); return await task; // Unwrap result/re-throw } }
Custom combinators - WithCancellation
async void Main() { var cts = new CancellationTokenSource (3000); // Cancel after 3 seconds string result = await SomeAsyncFunc().WithCancellation (cts.Token); result.Dump(); } async Task<string> SomeAsyncFunc() { await Task.Delay (10000); return "foo"; } public static class Extensions { public static Task<TResult> WithCancellation<TResult> (this Task<TResult> task, CancellationToken cancelToken) { var tcs = new TaskCompletionSource<TResult>(); var reg = cancelToken.Register (() => tcs.TrySetCanceled ()); task.ContinueWith (ant => { reg.Dispose(); if (ant.IsCanceled) tcs.TrySetCanceled(); else if (ant.IsFaulted) tcs.TrySetException (ant.Exception.InnerException); else tcs.TrySetResult (ant.Result); }); return tcs.Task; } }
Custom combinators - WhenAllOrError
// This will throw an exception immediately. async void Main() { Task<int> task1 = Task.Run (() => { throw null; return 42; } ); Task<int> task2 = Task.Delay (5000).ContinueWith (ant => 53); int[] results = await WhenAllOrError (task1, task2); } async Task<TResult[]> WhenAllOrError<TResult> (params Task<TResult>[] tasks) { var killJoy = new TaskCompletionSource<TResult[]>(); foreach (var task in tasks) task.ContinueWith (ant => { if (ant.IsCanceled) killJoy.TrySetCanceled(); else if (ant.IsFaulted) killJoy.TrySetException (ant.Exception.InnerException); }); return await await Task.WhenAny (killJoy.Task, Task.WhenAll (tasks)); }