日期:2014-05-17  浏览次数:21526 次

C#下实现RabbitMQ。

1.首先需要下载.net下的驱动类。rabbitmq.client.dll.安装后可以使用提供的文档。当然也可以直接从别的地方只下载rabbitmq.client.dll使用。

下载地址:http://www.rabbitmq.com/dotnet.html

?文档和安装程序都有了。

2.然后建立项目导入引用

???????? 一、首先建立一个消息的发送者类Sender

?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Content;
using System.Collections;

namespace Sender
{

    /// <summary>
    /// 向队列中写入一个消息
    /// </summary>
    public class ProduceMQ
    {
          static void Main(string[] args)
       
        {
           //服务器所在的主机ip
           ?Uri uri = new Uri("amqp://192.168.1.99:8688/");
            string exchange = "routing";//路由
            string exchangeType = "direct";//交换模式
            string routingKey = "rk";//路由关键字
            //是否对消息队列持久化保存
            bool persistMode = true;
            ConnectionFactory cf = new ConnectionFactory();

            cf.UserName = "gyg";//某个vhost下的用户
            cf.Password = "123456";
            cf.VirtualHost = "gyg001";//vhost
            cf.RequestedHeartbeat = 0;
            cf.Endpoint = new AmqpTcpEndpoint(uri);
?                           //创建一个连接到具体总结点的连接
           using (IConnection conn = cf.CreateConnection())
            {             //创建并返回一个新连接到具体节点的通道
                using (IModel ch = conn.CreateModel())
                {
                    if (exchangeType != null)
                    {//声明一个路由
                        ch.ExchangeDeclare(exchange, exchangeType);
                       //声明一个队列
                        ch.QueueDeclare("q", true, false, false, null);
                      //将一个队列和一个路由绑定起来。并制定路由关键字  
                      ch.QueueBind("q1", exchange, routingKey);
                    }
                    ///构造消息实体对象并发布到消息队列上
                    IMapMessageBuilder b = new MapMessageBuilder(ch);
                    IDictionary target = b.Headers;
                    target["header"] = "hello world";
                    IDictionary targerBody = b.Body;
                    targerBody["body"] = "hello world";//这个才是具体的发送内容
                    if (persistMode)
                    {
                        ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
                               //设定传输模式
                    }
                    //写入
                    ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody());
                    Console.WriteLine("写入成功");
                }

            }
        }


    }
}

?二、创建一个接受者:receiver

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Util;

namespace receiver
{
    public class receiver
    {
        static void Main(string[] args)
        {
           // Uri uri = new Uri("amqp://127.0.0.1:8688/");
            
            string exchange = "routing";
            string exchangeType = "direct";
            string routingKey = "rk";

            string serverAddress ="amqp://127.0.0.1:8688/";

            ConnectionFactory cf = new ConnectionFactory();
            cf.Uri =  serverAddress;
            
            cf.UserName = "gyg";
            cf.Password = "123456";
            cf.VirtualHost = "gyg001";
            cf.RequestedHeartbeat = 0;
            //cf.Endpoint = new AmqpTcpEndpoint(uri);
            using (IConnection conn = cf.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                    //普通使用方式BasicGet
                    //noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                    //noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,
                    //直到调用channel.basicAck(deliveryTag, false); 
                    //queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息

                    BasicGetResult res = c