日期:2014-05-17  浏览次数:20805 次

PHP AMQP 扩展 应用
周末休息 ,闲来研究下AMQP 的PHP 扩展,花了一天时间才调试好

高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用

我的AMQP服务器是使用RabbitMQ ,RabbitMQ 的安装网上很多。

重点说下 PHP 的扩展 php-amqp,我用的是最新的 amqp-1.0.1

文档是旧的 新的掺杂在一起 因此弄了好久才完成。

首先说下流程 即 Client - AMQP server - Client

左边的Client向右边的Client发送消息,流程:

1,  获取Conection

2,  获取Channel

3,  定义Exchange,Queue

4,  使用一个RoutingKey将Queue Binding到一个Exchange上

5,  通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,

6,  接收方在接收时也是获取connection,接着获取channel,然后指定一个Queue直接到它关心的Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消息就OK了

以下是PHP 的实现:

生产消息:
<?php

//设置你的连接
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
    echo "Established a connection to the broker \n";
}
else {
    echo "Cannot connect to the broker \n ";
}
//你的消息
$message = json_encode(array('Hello World!','php','c++'));
//创建channel
$channel = new AMQPChannel($conn);
//创建exchange 
$ex = new AMQPExchange($channel);
$ex->setName('exchange');//创建名字
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "exchange status:".$ex->declare();
echo "\n";
//创建队列 
$q = new AMQPQueue($channel);
//设置队列名字 如果不存在则添加
$q->setName('queue');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "\n";
echo 'queue bind: '.$q->bind('exchange','route.key');//将你的队列绑定到routingKey
echo "\n";

$channel->startTransaction();
echo "send: ".$ex->publish($message, 'route.key'); //将你的消息通过制定routingKey发送
$channel->commitTransaction();
$conn->disconnect();

?>


接收方 消费方

<?php
$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'
                    ,'vhost'=>'/');
$conn = new AMQPConnection($conn_args);
$conn->connect();
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue2');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "==========\n";
     
$messages = $q->get(AMQP_AUTOACK);
print_r($messages->getBody());
echo "\n";
// disconnect
$conn->disconnect();

?>


我是初学 消息队列这方面知识。
不知道 linux的消息队列 和AMQP 的 优缺点
我用php 的sysvmsg 函数 执行while 循环 可以挂起在服务器上,实时监听消息队列的消息,有消息则取,内存不会升高

而用amqp while 循环 监听 即使无消息  内存也会越来越高,最后导致PHP 使用内存不足而 报错。

看来这方面得多交流 多学习