日期:2014-05-17 浏览次数:20553 次
php-通过共享内存实现消息队列和进程通信的两个类
实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:
使用memcache方法来实现
<?php
/*
* @Copyright (c) 2007,上海友邻信息科技有限公司
* @All rights reserved.
*
* 这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。
*
* @filename MemcacheQueue.class.php
*/
/**
* Class and Function List:
* Function list:
* - __construct()
* - singleton()
* - init()
* - __get()
* - __set()
* - isEmpty()
* - isFull()
* - enQueue()
* - deQueue()
* - getTop()
* - getAll()
* - getPage()
* - makeEmpty()
* - getAllKeys()
* - add()
* - increment()
* - decrement()
* - set()
* - get()
* - delete()
* - getKeyByPos()
* Classes list:
* - Yl_MemcacheQueue
*/
class Yl_MemcacheQueue {
private static $instance;
private $memcache;
private $name;
private $prefix;
private $maxSize;
private function __construct() {
}
static function singleton() {
if (! (self::$instance instanceof self)) {
self::$instance = new Yl_MemcacheQueue ();
}
return self::$instance;
}
public function init($max_size, $name, $prefix = "__queue__") {
$max_size = 1000;
$name = '_war_';
$prefix = '_queue';
$this->memcache = Yl_Memcache::singleton ();
$this->name = $name;
$this->prefix = $prefix;
$this->maxSize = $max_size;
$this->add ( 'front', 0 );
$this->add ( 'rear', 0 );
$this->add ( 'size', 0 );
}
function isEmpty() {
return $this->get ( 'size' ) == 0;
}
function isFull() {
return $this->get ( 'size' ) >= $this->maxSize;
}
function enQueue($data) {
if ($this->isFull ()) {
throw new Exception ( "Queue is Full" );
}
$size = $this->increment ( 'size' );
$rear = $this->increment ( 'rear' );
$this->set ( ($rear - 1) % $this->maxSize, $data );
return $this;
}
function deQueue() {
if ($this->isEmpty ()) {
throw new Exception ( "Queue is Empty" );
}
$this->decrement ( 'size' );
$front = $this->increment ( 'front' );
$this->delete ( ($front - 1) % $this->maxSize );
return $this;
}
function getTop() {
return $this->get ( $this->get ( 'front' ) % $this->maxSize );
}
function getAll() {
return $this->getPage ();
}
function getPage($offset = 0, $limit = 0) {
$size = $this->get ( 'size' );
if (0 == $size || $size < $offset) {
return null;
}
$front = $this->get ( 'front' ) % $this->maxSize;
$rear = $this->get ( 'rear' ) % $this->maxSize;
$keys [] = $this->getKeyByPos ( ($front + $offset) % $this->maxSize );
$num = 1;
for($pos = ($front + $offset + 1) % $this->maxSize; $pos != $rear; $pos = ($pos + 1) % $this->maxSize) {
$keys [] = $this->getKeyByPos ( $pos );
$num ++;
if ($limit > 0 && $limit == $num) {
break;
}
}
return array_values ( $this->memcache->get ( $keys ) );
}
function makeEmpty() {
$keys = $this->getAllKeys ();
foreach ( $keys as $value ) {
$this->delete ( $value );
}
$this->delete ( "rear" );
$this->delete ( "front" );
$this->delete ( 'size' );
$this->delete ( "maxSize" );
}
private function getAllKeys() {
if ($this->isEmpty ()) {
return array ();
}
$keys [] = $this->get ( 'front' );
for($pos = ($this->get ( 'front' ) % $this->maxSize + 1) % $this->maxSize; $pos != $this->get ( 'rear' ) % $this->maxSize; $pos = ($pos + 1) % $this->maxSize) {
$keys [] = $pos;
}
return $keys;
}
private function add($pos, $data) {
$this->memcache->add ( $this->getKeyByPos ( $pos ), $data );
return $this;
}
private function increment($pos) {
return $this->memcache->increment ( $this->getKeyByPos ( $pos ) );
}
private function decrement($pos) {
$this->memcache->decrement ( $this->getKey