日期:2014-05-20  浏览次数:20762 次

【J2SE】生成者、消费者问题
本文转自:《Java JDK 实例宝典》

解决生产者消费者问题的关键技术点如下:
1.用线程模拟生产者,在run方法中不断地往仓库中存放产品。
2.用线程模拟消费者,在run方法中不断地从仓库中获取产品。
3.仓库类保存产品,当产品数量为0时,调用wait方法,使得当前消费者线程进入等待状态,当有新产品存入时,调用nofify方法,唤醒等待的消费者线程。当仓库满时,调用wait方法,使得当前生产者线程进入等待状态,当有消费者获取产品时,调用notify方法,唤醒等待的生产者线程。
4.停止生产者和消费者线程都通过设置标志位running为false实现。

Product类:

package com.syc.examples.chapter8;

public class Product {

	private String name; // 产品名
	public Product(String name){
		this.name = name;
	}
	public String toString(){
		return "Product-" + name;
	}
}

Producer类:

package com.syc.examples.chapter8;

public class Producer extends Thread {

	private Warehouse warehouse; // 生产者存储产品的仓库
	private static int produceName = 0; // 产品的名字
	private boolean running = false; // 是否需要结束线程的标志位
	
	public Producer(Warehouse use,String name){
		super(name);
		this.warehouse = use;
	}
	
	public void start(){
		this.running = true;
		super.start();
	}
	
	public void run(){
		Product product;
		try{
			while(running){
				// 生产并存储产品
				product = new Product((++produceName)+"");
				this.warehouse.storageProduct(product);
				sleep(300);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
	
	/**
	 * 停止生产者线程
	 */
	public void stopProducer(){
		synchronized(warehouse){
			this.running =false;
			// 通知等待仓库的线程
			warehouse.notifyAll();
		}
	}
	
	/**
	
	 * 生产者线程是否在运行
	 * @return
	 */public boolean isRunning(){
		return running;
	}
}

Consumer类:
package com.syc.examples.chapter8;

/**
 * 消费者,采用线程,模拟消费者行为
 * @author Administrator
 *
 */
public class Consumer extends Thread{

	private Warehouse warehouse; // 消费者获取产品的仓库
	private boolean running = false; // 是否需要结束线程的标志位
	
	public Consumer(Warehouse warehouse,String name){
		super(name);
		this.warehouse = warehouse;
	}
	
	public void start(){
		this.running = true;
		super.start();
	}
	
	public void run(){
		@SuppressWarnings("unused")
		Product product;
		try{
			while(running){
				// 从仓库中获取产品
				product = warehouse.getProduct();
				sleep(500);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
	
	/**
	 * 停止消费者线程
	 */
	public void stopConsumer(){
		synchronized(warehouse){
			this.running = false;
			warehouse.notifyAll(); // 通知等待仓库的线程
		}
	}
	
	/**
	 * 消费者线程是否在运行
	 */
	public boolean isRunning(){
		return running;
	}
}

Warehouse类:

package com.syc.examples.chapter8;

/**
 * 产品的仓库类
 * 内部采用数组来表示循环队列,以存放产品
 * @author Administrator
 *
 */
public class Warehouse {

	private static int CAPACITY = 11;//仓库的容量
	private Product[] products;// 仓库里的产品
	
	//[front,rear]区间的产品是未被消费的
	private int front = 0;// 当前仓库中第一个未被消费的产品的下标
	private int rear = 0;// 库中最后一个未被消费的产品的下标加1
	
	public Warehouse(){
		this.products = new Product[CAPACITY];
	}
	
	public Warehouse(int capacity){
		this();
		if(capacity > 0){
			CAPACITY = capacity+1;
			this.products = new Product[CAPACITY];
		}
	}
	
	/**
	 * 从仓库获取一个产品
	 * @throws InterruptedException 
	 */
	public Product getProduct() throws InterruptedException{
		synchronized(this){
			boolean consumerRunning = true; // 标志消费者线程是否还在运行
			Thread currentThread = Thread.currentThread();// 获取当前线程
			if(currentThread instanceof Consumer){
				consumerRunning = ((Consumer)currentThread).isRunning();
			}else{
				return null; // 非消费者不能获取产品
			}
			// 如果仓库中没有产品,而且消费者线程还在运行,则消费者线程继续等待
			while((front==rear) && consumerRunning){
				wait();
				consumerRunning = ((Consumer)currentThread).isRunning();
			}
			// 如果消费者线程已经没有运行了,则退出该方法,取消获取该产品
			if(!consumerRunning){
				return null;
			}
			// 取当前未被消费的第一个产品
			Product product = products[front];
			System.out.println("Consumer["+currentThread.getName()+"] getProduct: " + product);
			// 将当前未被消费产品的下标后移一位,如果到了数组末尾,则移动到首部
			front = (front+1+CAPACITY)%CAPACITY;
			System.out.println("仓库中还没有被消费的产品数量:"+(rear+CAPACITY-front)%CAPACITY);
			// 通知其他等待线程
			notify();
			return product;
		}
	}
	
	/**
	 * 向仓库存储一个产品
	 * @throws InterruptedException 
	 */
	public void storageProduct(Product product) throws InterruptedException{
		synchronized(this){
			boolean producerRunning = true; // 标志生产者线程是否在运行
			Thread currentThread = Thread.currentThread(); // 获取当前线程
			if(currentThread instanceof Producer){
				producerRunning = ((Producer)currentThread).isRunning();
			}else{
				return;
			}
			
			// 如果最后一个未被消费产品与第一个未被消费的产品的下标紧挨着,则说明没有
			// 存储空间,如果没有存储空间而且生产者线程还在运行,则等待仓库释放产品
			while(((rear+1)%CAPACITY == front) && producerRunning){
				wait();
				producerRunning = ((Producer)currentThread).isRunning();
			}
			
			// 如果生产者线程已经停止了,则停止产品的存储。
			if(!producerRunning){
				return;
			}
			
			// 保存参数产品到仓库
			products[rear] = product;
			System.out.println("Producer["+Thread.currentThread().getName()+"]storageProduct:"+product);
			// 将rear下标循环后移一位
			rear = (rear+1)%CAPACITY;
			System.out.println("仓库中还没有被消费的产品数量:"+(rear+CAPACITY-front)%CAPACITY);
			notify();
		}
	}
}

TestProduct类:
package com.syc.examples.chapter8;

public class TestProduct {
	
	public static void main(String[] args){
		Warehouse warehouse = new Warehouse(10); // 建立一个仓库,容量为10
		// 建立生产者和消费者
		Producer producers1 = new Producer(warehouse,"producer-1");
		Producer producers2 = new Producer(warehouse,"producer-2");
		Producer producers3 = new Producer(warehouse,"producer-3");
		Consumer consumers1 = new Consumer(warehouse,"consumer-1");
		Consumer consumers2 = new Consumer(warehouse,"consumer-2");
		Consumer consumers3 = new Consumer(warehouse,"consumer-3");
		Consumer consumers4 = new Consumer(warehouse,"consumer-4");
		// 启动生产者和消费者线程
		producers1.start();
		producers2.start();
		consumers1.start();
		producers3.start();
		consumers2.start();
		consumers3.start();
		consumers4.start();
		
		// 让生产者消费者程序预习1600ms
		try {
			Thread.sleep(1600);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// 停止生产者和消费者的线程
		producers1.stopProducer();
		consumers1.stopConsumer();
		producers2.stopProducer();
		consumers2.stopConsumer();
		producers3.stopProducer();
		consumers3.stopConsumer();
		consumers4.stopConsumer();
	}
}

引用
Producer[producer-1]storageProduct:Product-1
仓库中还没有被消费的产品数量:1
Producer[producer-2]storageProduct:Product-2
仓库中还没有被消费的产品数量:2
Consumer[consumer-1] getProduct: Product-1
仓库中还没有被消费的产品数量:1
Producer[producer-3]storageProduct:Product-3
仓库中还没有被消费的产品数量:2
Consumer[consumer-2] getProduct: Product-2
仓库中还没有被消费的产品数量:1
Consumer[consumer-3] getProduct: Product-3
仓库中还没有被消费的产品数量:0
Producer[producer-1]storageProduct:Product-4
仓库中还没有被消费的产品数量:1
Consumer[consumer-4] getProduct: Product-4
仓库中还没有被消费的产品数量:0
Producer[producer-2]storageProduct:Product-5
仓库中还没有被消费的产品数量:1
Producer[producer-3]storageProduct:Product-6
仓库中还没有被消费的产品数量:2
Consumer[consumer-1] getProduct: Product-5
仓库中还没有被消费的产品数量:1
Consumer[consumer-2] getProduct: Product-6
仓库中还没有被消费的产品数量:0
Producer[producer-1]storageProduct:Product-7
仓库中还没有被消费的产品数量:1
Consumer[consumer-3] getProduct: Product-7
仓库中还没有被消费的产品数量:0
Producer[producer-2]storageProduct:Product-8
仓库中还没有被消费的产品数量:1
Producer[producer-3]storageProduct:Product-9
仓库中还没有被消费的产品数量:2
Consumer[consumer-4] getProduct: Product-8
仓库中还没有被消费的产品数量:1
Producer[producer-1]storageProduct:Product-10
仓库中还没有被消费的产品数量:2
Producer[producer-2]storageProduct:Product-11
仓库中还没有被消费的产品数量:3
Producer[producer-3]storageProduct:Product-12
仓库中还没有被消费的产品数量:4
Consumer[consumer-1] getProduct: Product-9
仓库中还没有被消费的产品数量:3
Consumer[consumer-2] getProduct: Product-10
仓库中还没有被消费的产品数量:2
Consumer[consumer-3] getProduct: Product-11
仓库中还没有被消费的产品数量:1
Producer[producer-1]storageProduct:Product-13
仓库中还没有被消费的产品数量:2
Producer[producer-2]storageProduct:Product-14
仓库中还没有被消费的产品数量:3
Producer[producer-3]storageProduct:Product-15
仓库中还没有被消费的产品数量:4
Consumer[consumer-4] getProduct: Product-12
仓库中还没有被消费的产品数量:3
Consumer[consumer-1] getProduct: Product-13
仓库中还没有被消费的产品数量:2
Consumer[consumer-2] getProduct: Product-14
仓库中还没有被消费的产品数量:1
Producer[producer-1]storageProduct:Product-16
仓库中还没有被消费的产品数量:2
Producer[producer-2]storageProduct:Product-17
仓库中还没有被消费的产品数量:3
Producer[producer-3]storageProduct:Product-18
仓库中还没有被消费的产品数量:4
Consumer[consumer-3] getProduct: Product-15
仓库中还没有被消费的产品数量:3