日期:2009-08-22  浏览次数:20455 次

  上次在 .Net 多线程一例中,介绍了一种多线程模型,其实,这种模型的用处还是很多的。在我最近以临时工身份加入微软后,也遇到了这样的问题,所以,才考虑了这样一种模型,后来,发现它的用途应该还有很多,另外,经同事的一些修改,性能上也应该有了一些提高。所以,还是抽象出一个通用的类,用于此类问题的处理。


#region usings

using System;
using System.Collections;
using System.Threading;

#endregion

namespace org.hanzify.llf.util
{
  public delegate void OutputDelegate(object o);

  public class ThreadingQueue : IDisposable
  {
    public event OutputDelegate Output;

    private Queue ShareQueue = new Queue();
    private ArrayList Threads = new ArrayList();
    private bool Running = true;
    private AutoResetEvent HasIncoming = new AutoResetEvent(false);
    private ManualResetEvent NeedDispose = new ManualResetEvent(false);

    public ThreadingQueue(int ThreadNo)
    {
      for ( int i = 0; i < ThreadNo; i++ )
      {
        Thread t = new Thread(new ThreadStart(OutputThread));
        Threads.Add(t);
        t.Start();
      }
    }

    public void Input(object o)
    {
      if ( Running )
      {
        lock(ShareQueue)
        {
          ShareQueue.Enqueue(o);
        }
        HasIncoming.Set();
      }
    }

    private void OutputThread()
    {
      WaitHandle[] hs = new WaitHandle[] {NeedDispose, HasIncoming};
      while(true)
      {
        object o = null;
        lock(ShareQueue)
        {
          if ( ShareQueue.Count > 0 )
          {
            o = (int)ShareQueue.Dequeue();
          }
        }
        if (o != null)
        {
          if ( Output != null )
          {
            Output(o);
          }
        }
        else
        {
          if ( WaitHandle.WaitAny(hs) == 0 ) { break; }
        }
      }
    }

    public void Dispose()
    {
      if ( Running )
      {
        Running = false;
        NeedDispose.Set();
        foreach ( Thread t in Threads )
        {
          while ( t.IsAlive )
          {
            Thread.Sleep(0);
          }
        }
      }
    }

    ~ ThreadingQueue()
    {
      Dispose();
    }
  }
}

  要使用这个类,也很简单:

#region usings

using System;
using System.Collections;
using System.Threading;

#endregion

namespace org.hanzify.llf.util
{
  public class ThreadingQueueTest
  {
    public ThreadingQueueTest() {}

    public void Run()
    {
      using ( ThreadingQueue qt = new ThreadingQueue(6) )
      {
        qt.Output += new OutputDelegate(qt_Output);
        for ( int i = 40; i <= 60; i++ )
        {
          Console.WriteLine("写入{0}", i);
          qt.Input(i);
        }
      }
      Console.WriteLine("<<结束>>");
      Console.ReadLine();
    }

    private void qt_Output(object o)
    {
      Console.WriteLine("取出:{0}", o);
      Thread.Sleep(1000); //模拟运行花费。
    }
  }
}