上次在 .Net 多线程一例中,介绍了一种多线程模型,其实,这种模型的用处还是很多的。在我最近以临时工身份加入微软后,也遇到了这样的问题,所以,才考虑了这样一种模型,后来,发现它的用途应该还有很多,另外,经同事的一些修改,性能上也应该有了一些提高。所以,还是抽象出一个通用的类,用于此类问题的处理。
#region usings
using System;
using System.Collections;
using System.Threading;
#endregion
namespace org.hanzify.llf.util
{
public delegate void OutputDelegate(object o);
public class ThreadingQueue : IDisposable
{
public event OutputDelegate Output;
private Queue ShareQueue = new Queue();
private ArrayList Threads = new ArrayList();
private bool Running = true;
private AutoResetEvent HasIncoming = new AutoResetEvent(false);
private ManualResetEvent NeedDispose = new ManualResetEvent(false);
public ThreadingQueue(int ThreadNo)
{
for ( int i = 0; i < ThreadNo; i++ )
{
Thread t = new Thread(new ThreadStart(OutputThread));
Threads.Add(t);
t.Start();
}
}
public void Input(object o)
{
if ( Running )
{
lock(ShareQueue)
{
ShareQueue.Enqueue(o);
}
HasIncoming.Set();
}
}
private void OutputThread()
{
WaitHandle[] hs = new WaitHandle[] {NeedDispose, HasIncoming};
while(true)
{
object o = null;
lock(ShareQueue)
{
if ( ShareQueue.Count > 0 )
{
o = (int)ShareQueue.Dequeue();
}
}
if (o != null)
{
if ( Output != null )
{
Output(o);
}
}
else
{
if ( WaitHandle.WaitAny(hs) == 0 ) { break; }
}
}
}
public void Dispose()
{
if ( Running )
{
Running = false;
NeedDispose.Set();
foreach ( Thread t in Threads )
{
while ( t.IsAlive )
{
Thread.Sleep(0);
}
}
}
}
~ ThreadingQueue()
{
Dispose();
}
}
}
要使用这个类,也很简单:
#region usings
using System;
using System.Collections;
using System.Threading;
#endregion
namespace org.hanzify.llf.util
{
public class ThreadingQueueTest
{
public ThreadingQueueTest() {}
public void Run()
{
using ( ThreadingQueue qt = new ThreadingQueue(6) )
{
qt.Output += new OutputDelegate(qt_Output);
for ( int i = 40; i <= 60; i++ )
{
Console.WriteLine("写入{0}", i);
qt.Input(i);
}
}
Console.WriteLine("<<结束>>");
Console.ReadLine();
}
private void qt_Output(object o)
{
Console.WriteLine("取出:{0}", o);
Thread.Sleep(1000); //模拟运行花费。
}
}
}