日期:2014-05-17 浏览次数:20498 次
php-通过共享内存实现消息队列和进程通信的两个类
实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:
使用memcache方法来实现
<?php /* * @Copyright (c) 2007,上海友邻信息科技有限公司 * @All rights reserved. * * 这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。 * * @filename MemcacheQueue.class.php */ /** * Class and Function List: * Function list: * - __construct() * - singleton() * - init() * - __get() * - __set() * - isEmpty() * - isFull() * - enQueue() * - deQueue() * - getTop() * - getAll() * - getPage() * - makeEmpty() * - getAllKeys() * - add() * - increment() * - decrement() * - set() * - get() * - delete() * - getKeyByPos() * Classes list: * - Yl_MemcacheQueue */ class Yl_MemcacheQueue { private static $instance; private $memcache; private $name; private $prefix; private $maxSize; private function __construct() { } static function singleton() { if (! (self::$instance instanceof self)) { self::$instance = new Yl_MemcacheQueue (); } return self::$instance; } public function init($max_size, $name, $prefix = "__queue__") { $max_size = 1000; $name = '_war_'; $prefix = '_queue'; $this->memcache = Yl_Memcache::singleton (); $this->name = $name; $this->prefix = $prefix; $this->maxSize = $max_size; $this->add ( 'front', 0 ); $this->add ( 'rear', 0 ); $this->add ( 'size', 0 ); } function isEmpty() { return $this->get ( 'size' ) == 0; } function isFull() { return $this->get ( 'size' ) >= $this->maxSize; } function enQueue($data) { if ($this->isFull ()) { throw new Exception ( "Queue is Full" ); } $size = $this->increment ( 'size' ); $rear = $this->increment ( 'rear' ); $this->set ( ($rear - 1) % $this->maxSize, $data ); return $this; } function deQueue() { if ($this->isEmpty ()) { throw new Exception ( "Queue is Empty" ); } $this->decrement ( 'size' ); $front = $this->increment ( 'front' ); $this->delete ( ($front - 1) % $this->maxSize ); return $this; } function getTop() { return $this->get ( $this->get ( 'front' ) % $this->maxSize ); } function getAll() { return $this->getPage (); } function getPage($offset = 0, $limit = 0) { $size = $this->get ( 'size' ); if (0 == $size || $size < $offset) { return null; } $front = $this->get ( 'front' ) % $this->maxSize; $rear = $this->get ( 'rear' ) % $this->maxSize; $keys [] = $this->getKeyByPos ( ($front + $offset) % $this->maxSize ); $num = 1; for($pos = ($front + $offset + 1) % $this->maxSize; $pos != $rear; $pos = ($pos + 1) % $this->maxSize) { $keys [] = $this->getKeyByPos ( $pos ); $num ++; if ($limit > 0 && $limit == $num) { break; } } return array_values ( $this->memcache->get ( $keys ) ); } function makeEmpty() { $keys = $this->getAllKeys (); foreach ( $keys as $value ) { $this->delete ( $value ); } $this->delete ( "rear" ); $this->delete ( "front" ); $this->delete ( 'size' ); $this->delete ( "maxSize" ); } private function getAllKeys() { if ($this->isEmpty ()) { return array (); } $keys [] = $this->get ( 'front' ); for($pos = ($this->get ( 'front' ) % $this->maxSize + 1) % $this->maxSize; $pos != $this->get ( 'rear' ) % $this->maxSize; $pos = ($pos + 1) % $this->maxSize) { $keys [] = $pos; } return $keys; } private function add($pos, $data) { $this->memcache->add ( $this->getKeyByPos ( $pos ), $data ); return $this; } private function increment($pos) { return $this->memcache->increment ( $this->getKeyByPos ( $pos ) ); } private function decrement($pos) { $this->memcache->decrement ( $this->getKey