日期:2014-05-17  浏览次数:20498 次

php消息队列

php-通过共享内存实现消息队列和进程通信的两个类

实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:

使用memcache方法来实现

<?php
/*
 * @Copyright (c) 2007,上海友邻信息科技有限公司
 * @All	rights reserved.
 *
 * 这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。
 *
 * @filename   MemcacheQueue.class.php
 */
/**
 * Class and Function List:
 * Function list:
 * - __construct()
 * - singleton()
 * - init()
 * - __get()
 * - __set()
 * - isEmpty()
 * - isFull()
 * - enQueue()
 * - deQueue()
 * - getTop()
 * - getAll()
 * - getPage()
 * - makeEmpty()
 * - getAllKeys()
 * - add()
 * - increment()
 * - decrement()
 * - set()
 * - get()
 * - delete()
 * - getKeyByPos()
 * Classes list:
 * - Yl_MemcacheQueue
 */

class Yl_MemcacheQueue {
	private static $instance;
	private $memcache;
	private $name;
	private $prefix;
	private $maxSize;
	
	private function __construct() {
	}
	
	static function singleton() {
		
		if (! (self::$instance instanceof self)) {
			self::$instance = new Yl_MemcacheQueue ();
		
		}
		
		return self::$instance;
	}
	
	public function init($max_size, $name, $prefix = "__queue__") {
		$max_size = 1000;
		$name = '_war_';
		$prefix = '_queue';
		
		$this->memcache = Yl_Memcache::singleton ();
		$this->name = $name;
		$this->prefix = $prefix;
		$this->maxSize = $max_size;
		
		$this->add ( 'front', 0 );
		$this->add ( 'rear', 0 );
		$this->add ( 'size', 0 );
	}
	
	function isEmpty() {
		return $this->get ( 'size' ) == 0;
	}
	
	function isFull() {
		return $this->get ( 'size' ) >= $this->maxSize;
	}
	
	function enQueue($data) {
		if ($this->isFull ()) {
			throw new Exception ( "Queue is Full" );
		}
		
		$size = $this->increment ( 'size' );
		$rear = $this->increment ( 'rear' );
		
		$this->set ( ($rear - 1) % $this->maxSize, $data );
		
		return $this;
	}
	
	function deQueue() {
		if ($this->isEmpty ()) {
			throw new Exception ( "Queue is Empty" );
		}
		
		$this->decrement ( 'size' );
		$front = $this->increment ( 'front' );
		$this->delete ( ($front - 1) % $this->maxSize );
		
		return $this;
	}
	
	function getTop() {
		return $this->get ( $this->get ( 'front' ) % $this->maxSize );
	}
	
	function getAll() {
		return $this->getPage ();
	}
	
	function getPage($offset = 0, $limit = 0) {
		$size = $this->get ( 'size' );
		
		if (0 == $size || $size < $offset) {
			return null;
		}
		
		$front = $this->get ( 'front' ) % $this->maxSize;
		$rear = $this->get ( 'rear' ) % $this->maxSize;
		
		$keys [] = $this->getKeyByPos ( ($front + $offset) % $this->maxSize );
		$num = 1;
		
		for($pos = ($front + $offset + 1) % $this->maxSize; $pos != $rear; $pos = ($pos + 1) % $this->maxSize) {
			$keys [] = $this->getKeyByPos ( $pos );
			$num ++;
			
			if ($limit > 0 && $limit == $num) {
				break;
			}
		}
		
		return array_values ( $this->memcache->get ( $keys ) );
	}
	
	function makeEmpty() {
		$keys = $this->getAllKeys ();
		
		foreach ( $keys as $value ) {
			$this->delete ( $value );
		}
		
		$this->delete ( "rear" );
		$this->delete ( "front" );
		$this->delete ( 'size' );
		$this->delete ( "maxSize" );
	}
	
	private function getAllKeys() {
		if ($this->isEmpty ()) {
			return array ();
		}
		
		$keys [] = $this->get ( 'front' );
		
		for($pos = ($this->get ( 'front' ) % $this->maxSize + 1) % $this->maxSize; $pos != $this->get ( 'rear' ) % $this->maxSize; $pos = ($pos + 1) % $this->maxSize) {
			$keys [] = $pos;
		}
		
		return $keys;
	}
	
	private function add($pos, $data) {
		$this->memcache->add ( $this->getKeyByPos ( $pos ), $data );
		
		return $this;
	}
	
	private function increment($pos) {
		
		return $this->memcache->increment ( $this->getKeyByPos ( $pos ) );
	}
	
	private function decrement($pos) {
		$this->memcache->decrement ( $this->getKey