日期:2020-06-06  浏览次数:1242 次

Chapter 14 - Concurrency and Asynchrony

Threading Basics

Creating a thread

static void Main()
{
  Thread t = new Thread (WriteY);          // Kick off a new thread
  t.Start();                               // running WriteY()

  // Simultaneously, do something on the main thread.
  for (int i = 0; i < 1000; i++) Console.Write ("x");
}

static void WriteY()
{
  for (int i = 0; i < 1000; i++) Console.Write ("y");
}

Join

static void Main()
{
  Thread t = new Thread (Go);
  t.Start();
  t.Join();
  Console.WriteLine ("Thread t has ended!");
}
 
static void Go() 
{
  for (int i = 0; i < 1000; i++) Console.Write ("y"); 
}

Sleep

Thread.Sleep (500);   // Sleep the current thread for 500 milliseconds

Local state

static void Main() 
{
  new Thread (Go).Start();      // Call Go() on a new thread
  Go();                         // Call Go() on the main thread
}
 
static void Go()
{
  // Declare and use a local variable - 'cycles'
  for (int cycles = 0; cycles < 5; cycles++) Console.Write ('?');
}

Shared state - unsafe

void Main()
{
  ThreadTest.Main();
}

class ThreadTest
{
  bool _done;
    
  public static void Main()
  {
    ThreadTest tt = new ThreadTest();   // Create a common instance
    new Thread (tt.Go).Start();
    tt.Go();
  }
    
  void Go()   // Note that this is an instance method
  {
    if (!_done) { _done = true; Console.WriteLine ("Done"); }
  }
}

Shared state with closure - unsafe

bool done = false;
ThreadStart action = () =>
{
  if (!done) { done = true; Console.WriteLine ("Done"); }
};
new Thread (action).Start();
action();

Shared state with statics - unsafe

void Main()
{
  ThreadTest.Main();
}

class ThreadTest 
{
  static bool _done;    // Static fields are shared between all threads
            // in the same application domain.
  public static void Main()
  {
    new Thread (Go).Start();
    Go();
  }
  
  static void Go()
  {
    if (!_done) { _done = true; Console.WriteLine ("Done"); }
  }
}

Shared state - safe

void Main()
{
  ThreadSafe.Main();
}

class ThreadSafe 
{
  static bool _done;
  static readonly object _locker = new object();
  
  public static void Main()
  {
    new Thread (Go).Start();
    Go();
  }
  
  static void Go()
  {
    lock (_locker)
    {
      if (!_done) { Console.WriteLine ("Done"); _done = true; }
    }
  }
}

Passing in data with a lambda expression

static void Main()
{
  Thread t = new Thread ( () => Print ("Hello from t!") );
  t.Start();
}

static void Print (string message) { Console.WriteLine (message); }

Multi-statement lambda

new Thread (() =>
{
  Console.WriteLine ("I'm running on another thread!");
  Console.WriteLine ("This is so easy!");
}).Start();

Lambdas and captured variables - unsafe

for (int i = 0; i < 10; i++)
  new Thread (() => Console.Write (i)).Start();

Lambdas and captured variables - safe

for (int i = 0; i < 10; i++)
{
  int temp = i;
  new Thread (() => Console.Write (temp)).Start();
}

Exception handling - wrong place

static void Main()
{
  try
  {
    new Thread (Go).Start();
  }
  catch (Exception ex)
  {
    // We'll never get here!
    Console.WriteLine ("Exception!");
  }
}

static void Go() { throw null; }   // Throws a NullReferenceException

Exception handling - right place

static void Main()
{
  new Thread (Go).Start();
}

static void Go()
{
  try
  {
    throw null;    // The NullReferenceException will get caught below
  }
  catch (Exception ex)
  {
    //Typically log the exception, and/or signal another thread
    // that we've come unstuck
    ex.Dump ("Caught!");
  }
}

Basic signaling

var signal = new ManualResetEvent (false);

new Thread (() =>
{
  Console.WriteLine ("Waiting for signal...");
  signal.WaitOne();
  signal.Dispose();
  Console.WriteLine ("Got signal!");
}).Start();

Thread.Sleep(2000);
signal.Set();        // “Open” the signal

Threading in rich-client apps

void Main()
{
  new MyWindow().ShowDialog();
}

partial class MyWindow : Window
{
  TextBox txtMessage;
  
  public MyWindow()
  {
    InitializeComponent();
    new Thread (Work).Start();
  }
  
  void Work()
  {
    Thread.Sleep (5000);           // Simulate time-consuming task
    UpdateMessage ("The answer");
  }
  
  void UpdateMessage (string message)
  {
    Action action = () => txtMessage.Text = message;
    Dispatcher.BeginInvoke (action);
  }
  
  void InitializeComponent()
  {
    SizeToContent = SizeToContent.WidthAndHeight;
    WindowStartupLocation = WindowStartupLocation.CenterScreen;
    Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" };
  }
}

Synchronization contexts

void Main()
{
  Util.CreateSynchronizationContext();
  new MyWindow().ShowDialog();
}

partial class MyWindow : Window
{
  TextBox txtMessage;
  SynchronizationContext _uiSyncContext;

  public MyWindow()
  {
    InitializeComponent();
    // Capture the synchronization context for the current UI thread:
    _uiSyncContext = SynchronizationContext.Current;
    new Thread (Work).Start();
  }
  
  void Work()
  {
    Thread.Sleep (5000);           // Simulate time-consuming task
    UpdateMessage ("The answer");
  }
  
  void UpdateMessage (string message)
  {
    // Marshal the delegate to the UI thread:
    _uiSyncContext.Post (_ => txtMessage.Text = message, null);
  }
  
  void InitializeComponent()
  {
    SizeToContent = SizeToContent.WidthAndHeight;
    WindowStartupLocation = WindowStartupLocation.CenterScreen;
    Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" };
  }
}

Entering the ThreadPool

// Task is in System.Threading.Tasks
Task.Run (() => Console.WriteLine ("Hello from the thread pool"));

// The old-school way:
ThreadPool.QueueUserWorkItem (notUsed => Console.WriteLine ("Hello, old-school"));
Tasks

Starting a Task

Task.Run (() => Console.WriteLine ("Foo"));

Wait

Task task = Task.Run (() =>
{
  Console.WriteLine ("Task started");
  Thread.Sleep (2000);
  Console.WriteLine ("Foo");
});
Console.WriteLine (task.IsCompleted);  // False
task.Wait();  // Blocks until task is complete

Long-running task

Task task = Task.Factory.StartNew (() =>
{
  Console.WriteLine ("Task started");
  Thread.Sleep (2000);
  Console.WriteLine ("Foo");
}, TaskCreationOptions.LongRunning);

task.Wait();  // Blocks until task is complete

Returning a value

Task<int> task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; });

int result = task.Result;      // Blocks if not already finished
Console.WriteLine (result);    // 3

Count prime numbers

Task<int> primeNumberTask = Task.Run (() =>
  Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

Console.WriteLine ("Task running...");
Console.WriteLine ("The answer is " + primeNumberTask.Result);

Exceptions

// Start a Task that throws a NullReferenceException:
Task task = Task.Run (() => { throw null; });
try 
{
  task.Wait();
}
catch (AggregateException aex)
{
  if (aex.InnerException is NullReferenceException)
    Console.WriteLine ("Null!");
  else
    throw;
}

Continuations - GetAwaiter

Task<int> primeNumberTask = Task.Run (() =>
  Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

var awaiter = primeNumberTask.GetAwaiter();
awaiter.OnCompleted (() => 
{
  int result = awaiter.GetResult();
  Console.WriteLine (result);       // Writes result
});

Continuations - ContinueWith

// (See Chapter 23 for more on using ContinueWith.)

Task<int> primeNumberTask = Task.Run (() =>
  Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

primeNumberTask.ContinueWith (antecedent => 
{
  int result = antecedent.Result;
  Console.WriteLine (result);          // Writes 123
});

TaskCompletionSource - Print 42 after 5 seconds

var tcs = new TaskCompletionSource<int>();

new Thread (() => { Thread.Sleep (5000); tcs.SetResult (42); }).Start();

Task<int> task = tcs.Task;         // Our "slave" task.
Console.WriteLine (task.Result);   // 42

TaskCompletionSource - Our own Run method

void Main()
{
  Task<int> task = Run (() => { Thread.Sleep (5000); return 42; });
  task.Result.Dump();
}

Task<TResult> Run<TResult> (Func<TResult> function)
{
  var tcs = new TaskCompletionSource<TResult>();
  new Thread (() => 
  {
    try { tcs.SetResult (function()); }
    catch (Exception ex) { tcs.SetException (ex); }
  }).Start();
  return tcs.Task;
}

TaskCompletionSource - GetAnswerToLife

void Main()
{
  var awaiter = GetAnswerToLife().GetAwaiter();
  awaiter.OnCompleted (() => Console.WriteLine (awaiter.GetResult()));
}

Task<int> GetAnswerToLife()
{
  var tcs = new TaskCompletionSource<int>();
  // Create a timer that fires once in 5000 ms:
  var timer = new System.Timers.Timer (5000) { AutoReset = false };
  timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (42); };
  timer.Start();
  return tcs.Task;
}

Writing Delay method

void Main()
{
  Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));
}

Task Delay (int milliseconds)
{
  var tcs = new TaskCompletionSource<object>();
  var timer = new System.Timers.Timer (milliseconds) { AutoReset = false };
  timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); };
  timer.Start();
  return tcs.Task;
}

Delay times 10000

void Main()
{
  for (int i = 0; i < 10000; i++)
    Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));
}

Task Delay (int milliseconds)
{
  var tcs = new TaskCompletionSource<object>();
  var timer = new System.Timers.Timer (milliseconds) { AutoReset = false };
  timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); };
  timer.Start();
  return tcs.Task;
}

Task.Delay

Task.Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

// Another way to attach a continuation:
Task.Delay (5000).ContinueWith (ant => Console.WriteLine (42));

ValueTask

async Task Main(string[] args)
{
  var vt1 = AnswerQuestionAsync ("What's the answer to life?");
  var vt2 = AnswerQuestionAsync ("Is the sun shining?");

  Console.WriteLine ($"vt1.IsCompleted: {vt1.IsCompleted}"); // True
  Console.WriteLine ($"vt2.IsCompleted: {vt2.IsCompleted}"); // False

  var a1 = await vt1;
  Console.WriteLine ($"a1: {a1}"); // Immediate

  var a2 = await vt2;
  Console.WriteLine ($"a2: {a2}"); // Takes 5 seconds to appear
}


async ValueTask<string> AnswerQuestionAsync (string question)
{
  if (question == "What's the answer to life?")
    return "42"; // ValueTask<string>
    
  return await AskCortanaAsync(question); // ValueTask<Task<string>>
}

async Task<string> AskCortanaAsync (string question)
{
  await Task.Delay(5000);
  return "I don't know.";
}
Principles of Asynchrony

GetPrimesCount

void Main()
{
  DisplayPrimeCounts();
}

void DisplayPrimeCounts()
{
  for (int i = 0; i < 10; i++)
    Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) +
      " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
  
  Console.WriteLine ("Done!");
}

int GetPrimesCount (int start, int count)
{
  return
    ParallelEnumerable.Range (start, count).Count (n => 
      Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0));
}

Course-grained asynchrony

void Main()
{
  Task.Run (() => DisplayPrimeCounts());
}

void DisplayPrimeCounts()
{
  for (int i = 0; i < 10; i++)
    Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) +
      " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
  
  Console.WriteLine ("Done!");
}

int GetPrimesCount (int start, int count)
{
  return 
    ParallelEnumerable.Range (start, count).Count (n => 
      Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0));
}

Fine-grained asynchrony

void Main()
{
  DisplayPrimeCounts();
}

void DisplayPrimeCounts()
{
  DisplayPrimeCountsFrom (0);
}

void DisplayPrimeCountsFrom (int i)    // This is starting to get awkward!
{
  var awaiter = GetPrimesCountAsync (i*1000000 + 2, 1000000).GetAwaiter();
  awaiter.OnCompleted (() => 
  {
    Console.WriteLine (awaiter.GetResult() + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
    if (i++ < 10) DisplayPrimeCountsFrom (i);
    else Console.WriteLine ("Done");
  });
}

Task<int> GetPrimesCountAsync (int start, int count)
{
  return Task.Run (() => 
    ParallelEnumerable.Range (start, count).Count (n => 
      Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}

Making DisplayPrimesCount asynchronous

void Main()
{
  DisplayPrimeCountsAsync();
}

Task DisplayPrimeCountsAsync()
{
  var machine = new PrimesStateMachine();
  machine.DisplayPrimeCountsFrom (0);
  return machine.Task;
}

class PrimesStateMachine    // Even more awkward!!
{
  TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
  public Task Task { get { return _tcs.Task; } }
  
  public void DisplayPrimeCountsFrom (int i)
  {
    var awaiter = GetPrimesCountAsync (i*1000000+2, 1000000).GetAwaiter();
    awaiter.OnCompleted (() => 
    {
      Console.WriteLine (awaiter.GetResult());
      if (i++ < 10) DisplayPrimeCountsFrom (i);
      else { Console.WriteLine ("Done"); _tcs.SetResult (null); }
    });
  }
  
  Task<int> GetPrimesCountAsync (int start, int count)
  {
    return Task.Run (() => 
      ParallelEnumerable.Range (start, count).Count (n => 
        Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
  }
}

Asynchronous functions to the rescue

void Main()
{
  DisplayPrimeCountsAsync();
}

async Task DisplayPrimeCountsAsync()
{
  for (int i = 0; i < 10; i++)
    Console.WriteLine (await GetPrimesCountAsync (i*1000000 + 2, 1000000) +
      " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
  
  Console.WriteLine ("Done!");
}

Task<int> GetPrimesCountAsync (int start, int count)
{
  return Task.Run (() => 
    ParallelEnumerable.Range (start, count).Count (n => 
      Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
Asynchronous Functions in C# 5.0

Awaiting

void Main()
{
  DisplayPrimesCount();
}

async void DisplayPrimesCount()
{
  int result = await GetPrimesCountAsync (2, 1000000);
  Console.WriteLine (result);
}

Task<int> GetPrimesCountAsync (int start, int count)
{
  return Task.Run (() =>
    ParallelEnumerable.Range (start, count).Count (n => 
      Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
}

Capturing local state

void Main()
{
  DisplayPrimeCounts();
}

async void DisplayPrimeCounts()
{
  for (int i = 0; i < 10; i++)
    Console.WriteLine (await GetPrimesCountAsync (i*1000000+2, 1000000));
}

Task<int> GetPrimesCountAsync (int start, int count)
{
  return Task.Run (() =>
  ParallelEnumerable.Range (start, count).Count (n => 
    Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
}

Awaiting in a UI - synchronous

void Main()
{
  new TestUI().ShowDialog();
}

class TestUI : Window    // Notice how the window becomes unresponsive while working
{
  Button _button = new Button { Content = "Go" };
  TextBlock _results = new TextBlock();
    
  public TestUI()
  {
    var panel = new StackPanel();
    panel.Children.Add (_button);
    panel.Children.Add (_results);
    Content = panel;
    _button.Click += (sender, args) => Go();
  }
    
  void Go()
  {
    for (int i = 1; i < 5; i++)
    _results.Text += GetPrimesCount (i * 1000000, 1000000) +
      " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine;
  }
    
  int GetPrimesCount (int start, int count)
  {
    return ParallelEnumerable.Range (start, count).Count (n => 
    Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0));
  }
}

Awaiting in a UI - asynchronous

void Main()
{
  new TestUI().ShowDialog();
}

class TestUI : Window    // Notice how the window becomes unresponsive while working
{
  Button _button = new Button { Content = "Go" };
  TextBlock _results = new TextBlock();
    
  public TestUI()
  {
    var panel = new StackPanel();
    panel.Children.Add (_button);
    panel.Children.Add (_results);
    Content = panel;
    _button.Click += (sender, args) => Go();
  }
    
  async void Go()
  {
    _button.IsEnabled = false;
    
    for (int i = 1; i < 5; i++)
      _results.Text += await GetPrimesCountAsync (i * 1000000, 1000000) +
        " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine;

    _button.IsEnabled = true;
  }
    
  Task<int> GetPrimesCountAsync (int start, int count)
  {
    return Task.Run (() =>
      ParallelEnumerable.Range (start, count).Count (n => 
        Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
  }
}

Awaiting in a UI - IO-bound

void Main()
{
  new TestUI().ShowDialog();
}

class TestUI : Window    // Notice how the window becomes unresponsive while working
{
  Button _button = new Button { Content = "Go" };
  TextBlock _results = new TextBlock();
    
  public TestUI()
  {
    var panel = new StackPanel();
    panel.Children.Add (_button);
    panel.Children.Add (_results);
    Content = panel;
    _button.Click += (sender, args) => Go();
  }
    
  async void Go()
  {
    _button.IsEnabled = false;
    string[] urls = "www.albahari.com www.oreilly.com www.linqpad.net".Split();
    int totalLength = 0;
    try
    {
      foreach (string url in urls)
      {
        var uri = new Uri ("http://" + url);
        byte[] data = await new WebClient().DownloadDataTaskAsync (uri);
        _results.Text += "Length of " + url + " is " + data.Length + Environment.NewLine;
        totalLength += data.Length;
      }
      _results.Text += "Total length: " + totalLength;
    }
    catch (WebException ex)
    {
      _results.Text += "Error: " + ex.Message;
    }
    finally { _button.IsEnabled = true; }
  }
    
  Task<int> GetPrimesCountAsync (int start, int count)
  {
    return Task.Run (() =>
      ParallelEnumerable.Range (start, count).Count (n => 
        Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
  }
}

Awaiting in a UI - Comparison to course-grained concurrency

void Main()
{
  new TestUI().ShowDialog();
}

class TestUI : Window    // Notice how the window becomes unresponsive while working
{
  Button _button = new Button { Content = "Go" };
  TextBlock _results = new TextBlock();
    
  public TestUI()
  {
    var panel = new StackPanel();
    panel.Children.Add (_button);
    panel.Children.Add (_results);
    Content = panel;
    _button.Click += (sender, args) =>
    {
      _button.IsEnabled = false;
      Task.Run (() => Go());
    };
  }
    
  void Go()
  {
    // Notice the race condition (run it and look at what's wrong with the results):
    for (int i = 1; i < 5; i++)
    {
      int result = GetPrimesCount (i * 1000000, 1000000);
      Dispatcher.BeginInvoke (new Action (() =>
        _results.Text += result + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine));
    }
    Dispatcher.BeginInvoke (new Action (() => _button.IsEnabled = true));
  }
    
  int GetPrimesCount (int start, int count)
  {
    return
      ParallelEnumerable.Range (start, count).Count (n => 
        Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0));
  }
}

Writing asynchronous functions

void Main()
{
  Go();
}

async Task Go()
{
  await PrintAnswerToLife();
  Console.WriteLine ("Done");
}

async Task PrintAnswerToLife()   // We can return Task instead of void
{
  await Task.Delay (5000);
  int answer = 21 * 2;
  Console.WriteLine (answer);  
}

Returning Task of TResult

void Main()
{
  Go();
}

async Task Go()
{
  await PrintAnswerToLife();
  Console.WriteLine ("Done");
}

async Task PrintAnswerToLife()
{
  int answer = await GetAnswerToLife();
  Console.WriteLine (answer);
}

async Task<int> GetAnswerToLife()
{
  await Task.Delay (5000);
  int answer = 21 * 2;
  return answer;
}

Blocking versions of the above

void Main()
{
  Go();
}

void Go()
{
  PrintAnswerToLife();
  Console.WriteLine ("Done");
}

void PrintAnswerToLife()
{
  int answer = GetAnswerToLife();
  Console.WriteLine (answer);
}

int GetAnswerToLife()
{
  Thread.Sleep (5000);
  int answer = 21 * 2;
  return answer;
}

Parallelism

void Main()
{
  Go();
}

async Task Go()
{
  var task1 = PrintAnswerToLife();
  var task2 = PrintAnswerToLife();
  await task1; await task2;
  Console.WriteLine ("Done");
}

async Task PrintAnswerToLife()
{
  int answer = await GetAnswerToLife();
  Console.WriteLine (answer);
}

async Task<int> GetAnswerToLife()
{
  await Task.Delay (5000);
  int answer = 21 * 2;
  return answer;
}

Asynchronous lambda expressions

// Named asynchronous method:
async Task NamedMethod()
{
  await Task.Delay (1000);
  Console.WriteLine ("Foo");
}

async void Main()
{
  // Unnamed asynchronous method:
  Func<Task> unnamed = async () =>
  {
    await Task.Delay (1000);
    Console.WriteLine ("Foo");
  };

  // We can call the two in the same way:
  await NamedMethod();
  await unnamed();
}

Asynchronous lambda expressions - event handlers

var myButton = new Button { Height = 30, Content = "Wait..." };

myButton.Click += async (sender, args) =>
{
  await Task.Delay (1000);
  myButton.Content = "Done";
};

myButton.Dump();

Asynchronous lambda expressions - returning Task of TResult

Func<Task<int>> unnamed = async () =>
{
  await Task.Delay (1000);
  return 123;
};

int answer = await unnamed();
answer.Dump();

Optimizations - Completing synchronously

async void Main()
{
  string html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
  
  // Let's try again. It should be instant this time:
  html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
}

static Dictionary<string,string> _cache = new Dictionary<string,string>();

async Task<string> GetWebPageAsync (string uri)
{
  string html;
  if (_cache.TryGetValue (uri, out html)) return html;
  return _cache [uri] = await new WebClient().DownloadStringTaskAsync (uri);
}

Optimizations - Caching Tasks

async void Main()
{
  string html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
  
  // Let's try again. It should be instant this time:
  html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
}

static Dictionary<string,Task<string>> _cache = 
   new Dictionary<string,Task<string>>();

Task<string> GetWebPageAsync (string uri)
{
  Task<string> downloadTask;
  if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask;
  return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri);
}

Optimizations - Caching Tasks fully threadsafe

async void Main()
{
  string html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
  
  // Let's try again. It should be instant this time:
  html = await GetWebPageAsync ("http://www.linqpad.net");
  html.Length.Dump ("Characters downloaded");
}

static Dictionary<string,Task<string>> _cache = 
   new Dictionary<string,Task<string>>();

Task<string> GetWebPageAsync (string uri)
{
  lock (_cache)
  {
    Task<string> downloadTask;
    if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask;
    return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri);
  }
}

Optimizations - Avoiding excessive bouncing

void Main()
{
  A();
}

async void A()
{
  await B(); 
}

async Task B()
{
  for (int i = 0; i < 1000; i++)
    await C().ConfigureAwait (false);
}

async Task C() { /*...*/ }
Asynchronous Streams in C# 8.0

Asynchronous Stream vs Task of IEnumerable

Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives in one group.");

foreach (var data in await RangeTaskAsync(0, 10, 500))
  Console.WriteLine (data);

Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives as available.");

await foreach (var number in RangeAsync (0, 10, 500))
  Console.WriteLine (number);

static async Task<IEnumerable<int>> RangeTaskAsync(int start, int count, int delay)
{
  List<int> data = new List<int>();
  for (int i = start; i < start + count; i++)
  {
    await Task.Delay (delay);
    data.Add (i);
  }

  return data;
}


async IAsyncEnumerable<int> RangeAsync (
  int start, int count, int delay)
{
  for (int i = start; i < start + count; i++)
  {
    await Task.Delay (delay);
    yield return i;
  }
}

Asynchronous Streams and LINQ

async Task Main()
{
  IAsyncEnumerable<int> query =
    from i in RangeAsync (0, 10, 500)
    where i % 2 == 0   // Even numbers only.
    select i * 10;     // Multiply by 10.

  await foreach (var number in query)
    Console.WriteLine (number);
    
  query.Dump();   // in LINQPad, you can directly dump IAsyncEnumerable<T>
}

async IAsyncEnumerable<int> RangeAsync (
  int start, int count, int delay)
{
  for (int i = start; i < start + count; i++)
  {
    await Task.Delay (delay);
    yield return i;
  }
}
Asynchronous Patterns

Cancellation

async void Main()
{
  var token = new CancellationToken();
  Task.Delay (5000).ContinueWith (ant => token.Cancel());   // Tell it to cancel in two seconds.
  await Foo (token);
}

// This is a simplified version of the CancellationToken type in System.Threading:
class CancellationToken
{
  public bool IsCancellationRequested { get; private set; }
  public void Cancel() { IsCancellationRequested = true; }
  public void ThrowIfCancellationRequested()
  {
    if (IsCancellationRequested) throw new OperationCanceledException();
  }
}

async Task Foo (CancellationToken cancellationToken)
{
  for (int i = 0; i < 10; i++)
  {
    Console.WriteLine (i);
    await Task.Delay (1000);
    cancellationToken.ThrowIfCancellationRequested();
  }
}

Using the real CancellationToken

async void Main()
{
  var cancelSource = new CancellationTokenSource();
  Task.Delay (5000).ContinueWith (ant => cancelSource.Cancel());   // Tell it to cancel in two seconds.
  await Foo (cancelSource.Token);
}

async Task Foo (CancellationToken cancellationToken)
{
  for (int i = 0; i < 10; i++)
  {
    Console.WriteLine (i);
    await Task.Delay (1000);
    cancellationToken.ThrowIfCancellationRequested();
  }
}

Using the real CancellationToken - improved version

async void Main()
{
  var cancelSource = new CancellationTokenSource (5000);  // This tells it to cancel in 5 seconds
  await Foo (cancelSource.Token);
}

async Task Foo (CancellationToken cancellationToken)
{
  for (int i = 0; i < 10; i++)
  {
    Console.WriteLine (i);
    await Task.Delay (1000, cancellationToken);  // Cancellation tokens propagate nicely
  }
}

Progress reporting - with a delegate

async void Main()
{
  Action<int> progress = i => Console.WriteLine (i + " %");
  await Foo (progress);
}

Task Foo (Action<int> onProgressPercentChanged)
{
  return Task.Run (() =>
  {
    for (int i = 0; i < 1000; i++)
    {
      if (i % 10 == 0) onProgressPercentChanged (i / 10);
      // Do something compute-bound...
    }
  });
}

Progress reporting - with IProgress

async void Main()
{
  Action<int> progress = i => Console.WriteLine (i + " %");
  await Foo (progress);
}

Task Foo (Action<int> onProgressPercentChanged)
{
  return Task.Run (() =>
  {
    for (int i = 0; i < 1000; i++)
    {
      if (i % 10 == 0) onProgressPercentChanged (i / 10);
      // Do something compute-bound...
    }
  });
}

Task combinators - WhenAny

async void Main()
{
  Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
  Console.WriteLine ("Done");
  Console.WriteLine (winningTask.Result);   // 1
}

async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }

Task combinators - WhenAny - await winning task

async void Main()
{
  Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
  Console.WriteLine ("Done");
  Console.WriteLine (await winningTask);   // 1
}

async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }

Task combinators - WhenAny - in one step

async void Main()
{
  int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3());
  answer.Dump();
}

async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }

Task combinators - WhenAny - timeouts

async void Main()
{
  Task<string> task = SomeAsyncFunc();
  Task winner = await (Task.WhenAny (task, Task.Delay(5000)));
  if (winner != task) throw new TimeoutException();
  string result = await task;   // Unwrap result/re-throw
}

async Task<string> SomeAsyncFunc()
{
  await Task.Delay (10000);
  return "foo";
}

Task combinators - WhenAll

async void Main()
{
  await Task.WhenAll (Delay1(), Delay2(), Delay3());
  "Done".Dump();
}

async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }

Task combinators - WhenAll - exceptions

async void Main()
{
  Task task1 = Task.Run (() => { throw null; } );
  Task task2 = Task.Run (() => { throw null; } );
  Task all = Task.WhenAll (task1, task2);
  try { await all; }
  catch
  {
    Console.WriteLine (all.Exception.InnerExceptions.Count);   // 2 
  }    
}

Task combinators - WhenAll - return values

async void Main()
{
  Task<int> task1 = Task.Run (() => 1);
  Task<int> task2 = Task.Run (() => 2);
  int[] results = await Task.WhenAll (task1, task2);   // { 1, 2 }  
  results.Dump();
}

Task combinators - WhenAll - web page downloads

async void Main()
{
  int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
  totalSize.Dump();
}

async Task<int> GetTotalSize (string[] uris)
{
  IEnumerable<Task<byte[]>> downloadTasks = uris.Select (uri => new WebClient().DownloadDataTaskAsync (uri));  
  byte[][] contents = await Task.WhenAll (downloadTasks);
  return contents.Sum (c => c.Length);
}

Task combinators - WhenAll - web page downloads improved

async void Main()
{
  int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
  totalSize.Dump();
}

async Task<int> GetTotalSize (string[] uris)
{
  IEnumerable<Task<int>> downloadTasks = uris.Select (async uri =>
    (await new WebClient().DownloadDataTaskAsync (uri)).Length);
    
  int[] contentLengths = await Task.WhenAll (downloadTasks);
  return contentLengths.Sum();
}

Custom combinators - WithTimeout

async void Main()
{
  string result = await SomeAsyncFunc().WithTimeout (TimeSpan.FromSeconds (2));
  result.Dump();
}

async Task<string> SomeAsyncFunc()
{
  await Task.Delay (10000);
  return "foo";
}

public static class Extensions
{
  public async static Task<TResult> WithTimeout<TResult> (this Task<TResult> task, TimeSpan timeout)
  {
    Task winner = await (Task.WhenAny (task, Task.Delay (timeout)));
    if (winner != task) throw new TimeoutException();
    return await task;   // Unwrap result/re-throw
  }
}

Custom combinators - WithCancellation

async void Main()
{
  var cts = new CancellationTokenSource (3000);  // Cancel after 3 seconds
  string result = await SomeAsyncFunc().WithCancellation (cts.Token);
  result.Dump();
}

async Task<string> SomeAsyncFunc()
{
  await Task.Delay (10000);
  return "foo";
}

public static class Extensions
{
  public static Task<TResult> WithCancellation<TResult> (this Task<TResult> task, CancellationToken cancelToken)
  {
    var tcs = new TaskCompletionSource<TResult>();
    var reg = cancelToken.Register (() => tcs.TrySetCanceled ());
    task.ContinueWith (ant => 
    {
      reg.Dispose();    
      if (ant.IsCanceled)
        tcs.TrySetCanceled();
      else if (ant.IsFaulted)
        tcs.TrySetException (ant.Exception.InnerException);
      else
        tcs.TrySetResult (ant.Result);
    });
    return tcs.Task;  
  }
}

Custom combinators - WhenAllOrError

// This will throw an exception immediately.

async void Main()
{
  Task<int> task1 = Task.Run (() => { throw null; return 42; } );
  Task<int> task2 = Task.Delay (5000).ContinueWith (ant => 53);
  
  int[] results = await WhenAllOrError (task1, task2);
}

async Task<TResult[]> WhenAllOrError<TResult> (params Task<TResult>[] tasks)
{
  var killJoy = new TaskCompletionSource<TResult[]>();
  
  foreach (var task in tasks)
    task.ContinueWith (ant =>
    {
      if (ant.IsCanceled) 
        killJoy.TrySetCanceled();
      else if (ant.IsFaulted)
        killJoy.TrySetException (ant.Exception.InnerException);
    });
    
  return await await Task.WhenAny (killJoy.Task, Task.WhenAll (tasks));    
}