Chapter 22 - Advanced Threading
Exclusive Locking
Simple use of lock
static readonly object _locker = new object(); static int _val1, _val2; static void Main() { for (int i = 1; i <= 1000; i++) { if (i % 100 == 0) Console.WriteLine ($"Tried {i} times to get DivideByZeroException"); var t1 = new Thread (Go); t1.Start(); var t2 = new Thread (Go); t2.Start(); var t3 = new Thread (Go); t3.Start(); t1.Join(); t2.Join(); t3.Join(); } } static void Go() { lock (_locker) // Threadsafe: will never get DivideByZeroException { if (_val2 != 0) Console.WriteLine (_val1 / _val2); _val2 = 0; } }
Nested locking
static readonly object _locker = new object(); static void Main() { lock (_locker) { AnotherMethod(); // We still have the lock - because locks are reentrant. } } static void AnotherMethod() { lock (_locker) { Console.WriteLine ("Another method"); } }
Deadlocks
object locker1 = new object(); object locker2 = new object(); new Thread (() => { lock (locker1) { Thread.Sleep (1000); lock (locker2) { } // Deadlock } }).Start(); lock (locker2) { Thread.Sleep (1000); lock (locker1) { } // Deadlock }
Mutex
// To test this in LINQPad, run the query then clone the query (Shift+Ctrl+C) and run the copy at the same time. static void Main() { // Naming a Mutex makes it available computer-wide. Use a name that's // unique to your company and application (e.g., include your URL). using (var mutex = new Mutex (false, "oreilly.com OneAtATimeDemo")) { // Wait a few seconds if contended, in case another instance // of the program is still in the process of shutting down. if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false)) { Console.WriteLine ("Another instance of the app is running. Bye!"); return; } RunProgram(); } } static void RunProgram() { Console.WriteLine ("Running. Press Enter to exit"); Console.ReadLine(); }
Locking and Thread Safety
Thread safety and Framework types
static List <string> _list = new List <string>(); static void Main() { new Thread (AddItem).Start(); new Thread (AddItem).Start(); } static void AddItem() { lock (_list) _list.Add ("Item " + _list.Count); string[] items; lock (_list) items = _list.ToArray(); foreach (string s in items) Console.WriteLine (s); } // Note: In LINQPad, press Shift+F5 to clear static variables.
Thread safety in application servers
void Main() { new Thread (() => UserCache.GetUser (1).Dump()).Start(); new Thread (() => UserCache.GetUser (1).Dump()).Start(); new Thread (() => UserCache.GetUser (1).Dump()).Start(); } static class UserCache { static Dictionary <int, User> _users = new Dictionary <int, User>(); internal static User GetUser (int id) { User u = null; lock (_users) if (_users.TryGetValue (id, out u)) return u; u = RetrieveUser (id); // Method to retrieve from database; lock (_users) _users [id] = u; return u; } static User RetrieveUser (int id) { Thread.Sleep(1000); // simulate a time-consuming operation return new User { ID = id }; } } class User { public int ID; }
Thread safety in application servers - enhanced
async Task Main() { new Thread (() => UserCache.GetUserAsync (1).Dump()).Start(); new Thread (() => UserCache.GetUserAsync (1).Dump()).Start(); new Thread (() => UserCache.GetUserAsync (1).Dump()).Start(); // You can also await this method: User user = await UserCache.GetUserAsync (1); user.Dump(); } static class UserCache { static Dictionary <int, Task<User>> _userTasks = new Dictionary <int, Task<User>>(); internal static Task<User> GetUserAsync (int id) { lock (_userTasks) if (_userTasks.TryGetValue (id, out var userTask)) return userTask; else return _userTasks [id] = Task.Run (() => RetrieveUser (id)); } static User RetrieveUser (int id) { Thread.Sleep(1000); // simulate a time-consuming operation return new User { ID = id }; } } class User { public int ID; }
Non-exclusive Locking
Semaphore
static SemaphoreSlim _sem = new SemaphoreSlim (3); // Capacity of 3 static void Main() { for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i); } static void Enter (object id) { Console.WriteLine (id + " wants to enter"); _sem.Wait(); Console.WriteLine (id + " is in!"); // Only three threads Thread.Sleep (1000 * (int) id); // can be here at Console.WriteLine (id + " is leaving"); // a time. _sem.Release(); }
Async semaphores and locks
SemaphoreSlim _semaphore = new SemaphoreSlim (4); // 4 downloads at a time void Main() { Util.AutoScrollResults = true; for (int i = 0; i < 50; i++) { int local = i; DownloadWithSemaphoreAsync ("http://someinvaliduri/" + i) .ContinueWith (c => ("Finished download " + local).Dump()); } } async Task<byte[]> DownloadWithSemaphoreAsync (string uri) { using (await _semaphore.EnterAsync()) return await new WebClient().DownloadDataTaskAsync (uri); } static class Extensions { public static async Task<IDisposable> EnterAsync (this SemaphoreSlim ss) { await ss.WaitAsync().ConfigureAwait (false); return Disposable.Create (() => ss.Release()); } }
ReaderWriterLockSlim
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static List<int> _items = new List<int>(); static Random _rand = new Random(); static void Main() { new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Write).Start ("A"); new Thread (Write).Start ("B"); } static void Read() { while (true) { _rw.EnterReadLock(); foreach (int i in _items) Thread.Sleep (10); _rw.ExitReadLock(); } } static void Write (object threadID) { while (true) { int newNumber = GetRandNum (100); _rw.EnterWriteLock(); _items.Add (newNumber); _rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); Thread.Sleep (100); } } static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
ReaderWriterLockSlim - upgradeable locks
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static List<int> _items = new List<int>(); static Random _rand = new Random(); static void Main() { new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Write).Start ("A"); new Thread (Write).Start ("B"); } static void Read() { while (true) { _rw.EnterReadLock(); foreach (int i in _items) Thread.Sleep (10); _rw.ExitReadLock(); } } static void Write (object threadID) { while (true) { int newNumber = GetRandNum (100); _rw.EnterUpgradeableReadLock(); if (!_items.Contains (newNumber)) { _rw.EnterWriteLock(); _items.Add (newNumber); _rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); } _rw.ExitUpgradeableReadLock(); Thread.Sleep (100); } } static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
ReaderWriterLockSlim - lock recursion
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion); rw.EnterReadLock(); rw.EnterReadLock(); rw.ExitReadLock(); rw.ExitReadLock(); rw.EnterWriteLock(); rw.EnterReadLock(); Console.WriteLine (rw.IsReadLockHeld); // True Console.WriteLine (rw.IsWriteLockHeld); // True rw.ExitReadLock(); rw.ExitWriteLock();
Signaling with Event Wait Handles
AutoResetEvent
static EventWaitHandle _waitHandle = new AutoResetEvent (false); static void Main() { new Thread (Waiter).Start(); Thread.Sleep (1000); // Pause for a second... _waitHandle.Set(); // Wake up the Waiter. } static void Waiter() { Console.WriteLine ("Waiting..."); _waitHandle.WaitOne(); // Wait for notification Console.WriteLine ("Notified"); }
Two-way signaling
static EventWaitHandle _ready = new AutoResetEvent (false); static EventWaitHandle _go = new AutoResetEvent (false); static readonly object _locker = new object(); static string _message; static void Main() { new Thread (Work).Start(); _ready.WaitOne(); // First wait until worker is ready lock (_locker) _message = "ooo"; _go.Set(); // Tell worker to go _ready.WaitOne(); lock (_locker) _message = "ahhh"; // Give the worker another message _go.Set(); _ready.WaitOne(); lock (_locker) _message = null; // Signal the worker to exit _go.Set(); } static void Work() { while (true) { _ready.Set(); // Indicate that we're ready _go.WaitOne(); // Wait to be kicked off... lock (_locker) { if (_message == null) return; // Gracefully exit Console.WriteLine (_message); } } }
CountdownEvent
static CountdownEvent _countdown = new CountdownEvent (3); static void Main() { new Thread (SaySomething).Start ("I am thread 1"); new Thread (SaySomething).Start ("I am thread 2"); new Thread (SaySomething).Start ("I am thread 3"); _countdown.Wait(); // Blocks until Signal has been called 3 times Console.WriteLine ("All threads have finished speaking!"); } static void SaySomething (object thing) { Thread.Sleep (1000); Console.WriteLine (thing); _countdown.Signal(); }
Wait Handles and continuations
static ManualResetEvent _starter = new ManualResetEvent (false); public static void Main() { RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject (_starter, Go, "Some Data", -1, true); Thread.Sleep (5000); Console.WriteLine ("Signaling worker..."); _starter.Set(); Console.ReadLine(); reg.Unregister (_starter); // Clean up when we’re done. } public static void Go (object data, bool timedOut) { Console.WriteLine ("Started - " + data); // Perform task... }
The Barrier Class
Barrier
static Barrier _barrier = new Barrier (3); static void Main() { new Thread (Speak).Start(); new Thread (Speak).Start(); new Thread (Speak).Start(); } static void Speak() { for (int i = 0; i < 5; i++) { Console.Write (i + " "); _barrier.SignalAndWait(); } }
Barrier - post-phase action
static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine()); static void Main() { new Thread (Speak).Start(); new Thread (Speak).Start(); new Thread (Speak).Start(); } static void Speak() { for (int i = 0; i < 5; i++) { Console.Write (i + " "); _barrier.SignalAndWait(); } }
Lazy Initialization
Intro
void Main() { new Foo().Expensive.Dump(); } class Foo { Expensive _expensive; public Expensive Expensive // Lazily instantiate Expensive { get { if (_expensive == null) _expensive = new Expensive(); return _expensive; } } } class Expensive { /* Suppose this is expensive to construct */ }
Intro - with lock
void Main() { new Foo().Expensive.Dump(); } class Foo { Expensive _expensive; readonly object _expenseLock = new object(); public Expensive Expensive { get { lock (_expenseLock) { if (_expensive == null) _expensive = new Expensive(); return _expensive; } } } } class Expensive { /* Suppose this is expensive to construct */ }
Lazy of T
void Main() { new Foo().Expensive.Dump(); } class Foo { Lazy<Expensive> _expensive = new Lazy<Expensive> (() => new Expensive(), true); public Expensive Expensive { get { return _expensive.Value; } } } class Expensive { /* Suppose this is expensive to construct */ }
LazyInitializer
void Main() { new Foo().Expensive.Dump(); } class Foo { Expensive _expensive; public Expensive Expensive { // Implement double-checked locking get { LazyInitializer.EnsureInitialized (ref _expensive, () => new Expensive()); return _expensive; } } } class Expensive { /* Suppose this is expensive to construct */ }
Thread-local Storage
ThreadStatic
[ThreadStatic] static int _x; void Main() { new Thread (() => { Thread.Sleep(1000); _x++; _x.Dump(); }).Start(); new Thread (() => { Thread.Sleep(2000); _x++; _x.Dump(); }).Start(); new Thread (() => { Thread.Sleep(3000); _x++; _x.Dump(); }).Start(); }
ThreadLocal
static ThreadLocal<int> _x = new ThreadLocal<int> (() => 3); void Main() { new Thread (() => { Thread.Sleep(1000); _x.Value++; _x.Dump(); }).Start(); new Thread (() => { Thread.Sleep(2000); _x.Value++; _x.Dump(); }).Start(); new Thread (() => { Thread.Sleep(3000); _x.Value++; _x.Dump(); }).Start(); }
GetData and SetData
void Main() { var test = new Test(); new Thread (() => { Thread.Sleep(1000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start(); new Thread (() => { Thread.Sleep(2000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start(); new Thread (() => { Thread.Sleep(3000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start(); } class Test { // The same LocalDataStoreSlot object can be used across all threads. LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel"); // This property has a separate value on each thread. public int SecurityLevel { get { object data = Thread.GetData (_secSlot); return data == null ? 0 : (int) data; // null == uninitialized } set { Thread.SetData (_secSlot, value); } } }
AsyncLocal
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>(); async Task Main() { Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID"); _asyncLocalTest.Value = "test"; await Task.Delay (1000); Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID"); Console.WriteLine (_asyncLocalTest.Value); }
AsyncLocal - concurrent
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>(); void Main() { new Thread (() => Test ("one")).Start(); new Thread (() => Test ("two")).Start(); } async void Test (string value) { _asyncLocalTest.Value = value; await Task.Delay (1000); Console.WriteLine (value + " " + _asyncLocalTest.Value); }
AsyncLocal - inherited value
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>(); void Main() { _asyncLocalTest.Value = "test"; new Thread (AnotherMethod).Start(); } void AnotherMethod() => Console.WriteLine (_asyncLocalTest.Value); // test
AsyncLocal - inherited value - copy
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>(); void Main() { _asyncLocalTest.Value = "test"; var t = new Thread (AnotherMethod); t.Start(); t.Join(); Console.WriteLine (_asyncLocalTest.Value); // test (not ha-ha!) } void AnotherMethod() => _asyncLocalTest.Value = "ha-ha!";
AsyncLocal - inherited value - copy - limitation
static AsyncLocal<StringBuilder> _asyncLocalTest = new AsyncLocal<StringBuilder>(); void Main() { _asyncLocalTest.Value = new StringBuilder ("test"); var t = new Thread (AnotherMethod); t.Start(); t.Join(); Console.WriteLine (_asyncLocalTest.Value.ToString()); // test haha! } void AnotherMethod() => _asyncLocalTest.Value.Append (" ha-ha!");
Timers
Multithreaded timers - Threading Timer
static void Main() { // First interval = 5000ms; subsequent intervals = 1000ms Timer tmr = new Timer (Tick, "tick...", 5000, 1000); Console.WriteLine ("Press Enter to stop"); Console.ReadLine(); tmr.Dispose(); // This both stops the timer and cleans up. } static void Tick (object data) { // This runs on a pooled thread Console.WriteLine (data); // Writes "tick..." }
Multithreaded timers - System.Timer
static void Main() { var tmr = new System.Timers.Timer(); // Doesn't require any args tmr.Interval = 500; tmr.Elapsed += tmr_Elapsed; // Uses an event instead of a delegate tmr.Start(); // Start the timer Console.ReadLine(); tmr.Stop(); // Stop the timer Console.ReadLine(); tmr.Start(); // Restart the timer Console.ReadLine(); tmr.Dispose(); // Permanently stop the timer } static void tmr_Elapsed (object sender, EventArgs e) { Console.WriteLine ("Tick"); }
EXTRA - Wait and Pulse
Signaling with Wait and Pulse
// See http://www.albahari.com/threading/part4.aspx ("Signaling with Wait and Pulse") for the accompanying text. static readonly object _locker = new object(); static bool _go; static void Main() { // The new thread will block because _go==false. new Thread (Work).Start(); Console.WriteLine ("Press Enter to signal"); Console.ReadLine(); // Wait for user to hit Enter lock (_locker) // Let's now wake up the thread by { // setting _go=true and pulsing. _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // Lock is released while we’re waiting Console.WriteLine ("Woken!!!"); }
Now not to use Wait and Pulse
// Non-deterministic! static readonly object _locker = new object(); static void Main() { new Thread (Work).Start(); lock (_locker) Monitor.Pulse (_locker); } static void Work() { lock (_locker) Monitor.Wait (_locker); Console.WriteLine ("Woken!!!"); }
Producer-consumer queue
static void Main() { PCQueue q = new PCQueue (2); Console.WriteLine ("Enqueuing 10 items..."); for (int i = 0; i < 10; i++) { int itemNumber = i; // To avoid the captured variable trap q.EnqueueItem (() => { Thread.Sleep (1000); // Simulate time-consuming work Console.Write (" Task" + itemNumber); }); } q.Shutdown (true); Console.WriteLine(); Console.WriteLine ("Workers complete!"); } public class PCQueue { readonly object _locker = new object(); Thread[] _workers; Queue<Action> _itemQ = new Queue<Action>(); public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } public void Shutdown (bool waitForWorkers) { // Enqueue one null item per worker to make each exit. foreach (Thread worker in _workers) EnqueueItem (null); // Wait for workers to finish if (waitForWorkers) foreach (Thread worker in _workers) worker.Join(); } public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // We must pulse because we're Monitor.Pulse (_locker); // changing a blocking condition. } } void Consume() { while (true) // Keep consuming until { // told otherwise. Action item; lock (_locker) { while (_itemQ.Count == 0) Monitor.Wait (_locker); item = _itemQ.Dequeue(); } if (item == null) return; // This signals our exit. item(); // Execute item. } } }
Two-way signaling and races
static readonly object _locker = new object(); static bool _go; static void Main() { new Thread (SaySomething).Start(); for (int i = 0; i < 5; i++) lock (_locker) { _go = true; Monitor.PulseAll (_locker); } } static void SaySomething() { for (int i = 0; i < 5; i++) lock (_locker) { while (!_go) Monitor.Wait (_locker); _go = false; Console.WriteLine ("Wassup?"); } }
Two-way signaling and races - solution
static readonly object _locker = new object(); static bool _ready, _go; static void Main() { new Thread (SaySomething).Start(); for (int i = 0; i < 5; i++) lock (_locker) { while (!_ready) Monitor.Wait (_locker); _ready = false; _go = true; Monitor.PulseAll (_locker); } } static void SaySomething() { for (int i = 0; i < 5; i++) lock (_locker) { _ready = true; Monitor.PulseAll (_locker); // Remember that calling while (!_go) Monitor.Wait (_locker); // Monitor.Wait releases _go = false; // and reacquires the lock. Console.WriteLine ("Wassup?"); } }
Simulating a ManualResetEvent
void Main() { new Thread (() => { Thread.Sleep (2000); Set(); }).Start(); Console.WriteLine ("Waiting..."); WaitOne(); Console.WriteLine ("Signaled"); } readonly object _locker = new object(); bool _signal; void WaitOne() { lock (_locker) { while (!_signal) Monitor.Wait (_locker); } } void Set() { lock (_locker) { _signal = true; Monitor.PulseAll (_locker); } } void Reset() { lock (_locker) _signal = false; }
Writing a CountdownEvent
void Main() { var cd = new Countdown(5); new Thread (() => { for (int i = 0; i < 5; i++) { Thread.Sleep(1000); cd.Signal(); Console.WriteLine ("Signal " + i); } }).Start(); Console.WriteLine ("Waiting"); cd.Wait(); Console.WriteLine ("Unblocked"); } public class Countdown { object _locker = new object (); int _value; public Countdown() { } public Countdown (int initialCount) { _value = initialCount; } public void Signal() { AddCount (-1); } public void AddCount (int amount) { lock (_locker) { _value += amount; if (_value <= 0) Monitor.PulseAll (_locker); } } public void Wait() { lock (_locker) while (_value > 0) Monitor.Wait (_locker); } }
Thread rendezvous
static object _locker = new object(); static CountdownEvent _countdown = new CountdownEvent(2); public static void Main() { // Get each thread to sleep a random amount of time. Random r = new Random(); new Thread (Mate).Start (r.Next (10000)); Thread.Sleep (r.Next (10000)); _countdown.Signal(); _countdown.Wait(); Console.Write ("Mate! "); } static void Mate (object delay) { Thread.Sleep ((int) delay); _countdown.Signal(); _countdown.Wait(); Console.Write ("Mate! "); }