日期:2009-05-19  浏览次数:20552 次

  一般来说,单线程的程序因为没有系统调度的开销,速度上比多线程的程序快,不过处理有些时间花费比较大的过程将会延迟其它时间花费较低的过程。另外,对于多 CPU 的服务器来说,多线程才能利用其多 CPU 的优势。


  按照 Exchange Server 开发组的测试,一台服务器上,每个 CPU 开两个线程比较能达到最大效能。此数据可以作为经验数据,不过在实际中,还是应该令线程数量可配置,也可以根据服务器的实际情况测试最佳效能的线程数量。

  对于类似 Exchange Server 这种应用来说,需要一个主线程,并把邮件推入一个队列,而另外有N个线程,进行邮件处理的工作。

  不过,这个模型中,如果每一个线程都是使用死循环检测的方式,将会总是占用 100% 的 CPU 时间,这在用户使用非高峰期是不划算的。另外,预计在用户使用率比较低的时候运行的程序,也会被这些线程干扰,造成运行效率非常低下。

  所以,需要一种方法,能使得 CPU 占用率和用户使用率成正比。另外,应该也需要有一种方法,在程序退出的时候,保证正在运行的解释线程完成未完的工作。

  对此,我考虑可以有三种解决方案:

  1.接收的线程,在发现队列为空的时候,使用共享锁将队列锁定,从而造成其它使用队列的线程的阻塞,等到接收到邮件后,再将其加入队列,并且解锁;处理线程只需要使用共享锁访问队列就可以了。

  2.处理线程除了使用共享锁访问队列之外,在检测到队列为空的时候,自行调用暂停线程的函数;接收邮件的线程,等到接收到邮件后,将其加入队列,并且呼叫所有暂停的线程恢复工作。

  3.处理线程除了使用共享锁访问队列之外,在检测到队列为空的时候,调用Sleep()函数,这是最简单的共享方法,其它部分不需要做任何改动。不过,对偶发邮件的反应时间不是很灵敏。

  下面是我写的演示了使用第三种方案的情况的代码:



#region usings

using System;
using System.Collections;
using System.Threading;

#endregion

namespace ThreadTest
{
  public class ThreadID
  {
    public static Queue q;

    static ThreadID()
    {
      q = new Queue(new int[]{1,2,3,4,5,6,7,8,9});
    }

    public static int GetTid()
    {
      return (int)q.Dequeue();
    }
  }

  public class Rand
  {
    private static Random r = new Random();

    public static int Next(int MaxValue)
    {
      return r.Next(MaxValue);
    }

    public static int Next(int MinValue, int MaxValue)
    {
      return r.Next(MinValue, MaxValue);
    }
  }

  public class MyThread
  {
    private Queue ShareQueue = new Queue();

    public void Run()
    {
      Thread t1 = new Thread(new ThreadStart(InputThread));
      Thread t2 = new Thread(new ThreadStart(OutputThread));
      Thread t3 = new Thread(new ThreadStart(OutputThread));

      t1.Start();
      t2.Start();
      t3.Start();

      t2.Join();
      t3.Join();

      Console.WriteLine("<<结束>>");
      Console.ReadLine();
    }

    public void InputThread()
    {
      for(int i=40; i<60; i++)
      {
        Thread.Sleep(Rand.Next(100, 600)); //模拟接收速度。
        lock(ShareQueue)
        {
          ShareQueue.Enqueue(i);
        }
        Console.WriteLine("写入");
      }
    }

    public void OutputThread()
    {
      int tid = ThreadID.GetTid(); // 显示需要
      for(int i=0; i<10;)
      {
        int n = -1;
        lock(ShareQueue)
        {
          if ( ShareQueue.Count > 0 )
          {
            i++;
            n = (int)ShareQueue.Dequeue();
          }
          else
          {
            Console.Write("#{0}#", tid);
          }
        }
        if (n > 0)
        {
          Console.WriteLine("({0})取出:{1}", tid, n);
          Thread.Sleep(1000); //模拟运行花费。
        }
        else
        {
          Thread.Sleep(200); //第三种方案。
        }
      }
    }
  }

  #region System Code

  class Program
  {
    [STAThread]
    static void Main(string[] args)
    {
      (new MyThread()).Run();
    }
  }

  #endregion
}


  当然,我更看好第一种方案,不过它的实现有一些难度,而且,如果程序编写有问题的话,还容易造成永久的死锁。所以如果要有效而简单的话,第三种方案是一个比较好的备选方案。