日期:2014-05-20 浏览次数:20810 次
package com.syc.examples.chapter8; public class Product { private String name; // 产品名 public Product(String name){ this.name = name; } public String toString(){ return "Product-" + name; } }
package com.syc.examples.chapter8; public class Producer extends Thread { private Warehouse warehouse; // 生产者存储产品的仓库 private static int produceName = 0; // 产品的名字 private boolean running = false; // 是否需要结束线程的标志位 public Producer(Warehouse use,String name){ super(name); this.warehouse = use; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; try{ while(running){ // 生产并存储产品 product = new Product((++produceName)+""); this.warehouse.storageProduct(product); sleep(300); } }catch(InterruptedException e){ e.printStackTrace(); } } /** * 停止生产者线程 */ public void stopProducer(){ synchronized(warehouse){ this.running =false; // 通知等待仓库的线程 warehouse.notifyAll(); } } /** * 生产者线程是否在运行 * @return */public boolean isRunning(){ return running; } }
package com.syc.examples.chapter8; /** * 消费者,采用线程,模拟消费者行为 * @author Administrator * */ public class Consumer extends Thread{ private Warehouse warehouse; // 消费者获取产品的仓库 private boolean running = false; // 是否需要结束线程的标志位 public Consumer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ @SuppressWarnings("unused") Product product; try{ while(running){ // 从仓库中获取产品 product = warehouse.getProduct(); sleep(500); } }catch(InterruptedException e){ e.printStackTrace(); } } /** * 停止消费者线程 */ public void stopConsumer(){ synchronized(warehouse){ this.running = false; warehouse.notifyAll(); // 通知等待仓库的线程 } } /** * 消费者线程是否在运行 */ public boolean isRunning(){ return running; } }
package com.syc.examples.chapter8; /** * 产品的仓库类 * 内部采用数组来表示循环队列,以存放产品 * @author Administrator * */ public class Warehouse { private static int CAPACITY = 11;//仓库的容量 private Product[] products;// 仓库里的产品 //[front,rear]区间的产品是未被消费的 private int front = 0;// 当前仓库中第一个未被消费的产品的下标 private int rear = 0;// 库中最后一个未被消费的产品的下标加1 public Warehouse(){ this.products = new Product[CAPACITY]; } public Warehouse(int capacity){ this(); if(capacity > 0){ CAPACITY = capacity+1; this.products = new Product[CAPACITY]; } } /** * 从仓库获取一个产品 * @throws InterruptedException */ public Product getProduct() throws InterruptedException{ synchronized(this){ boolean consumerRunning = true; // 标志消费者线程是否还在运行 Thread currentThread = Thread.currentThread();// 获取当前线程 if(currentThread instanceof Consumer){ consumerRunning = ((Consumer)currentThread).isRunning(); }else{ return null; // 非消费者不能获取产品 } // 如果仓库中没有产品,而且消费者线程还在运行,则消费者线程继续等待 while((front==rear) && consumerRunning){ wait(); consumerRunning = ((Consumer)currentThread).isRunning(); } // 如果消费者线程已经没有运行了,则退出该方法,取消获取该产品 if(!consumerRunning){ return null; } // 取当前未被消费的第一个产品 Product product = products[front]; System.out.println("Consumer["+currentThread.getName()+"] getProduct: " + product); // 将当前未被消费产品的下标后移一位,如果到了数组末尾,则移动到首部 front = (front+1+CAPACITY)%CAPACITY; System.out.println("仓库中还没有被消费的产品数量:"+(rear+CAPACITY-front)%CAPACITY); // 通知其他等待线程 notify(); return product; } } /** * 向仓库存储一个产品 * @throws InterruptedException */ public void storageProduct(Product product) throws InterruptedException{ synchronized(this){ boolean producerRunning = true; // 标志生产者线程是否在运行 Thread currentThread = Thread.currentThread(); // 获取当前线程 if(currentThread instanceof Producer){ producerRunning = ((Producer)currentThread).isRunning(); }else{ return; } // 如果最后一个未被消费产品与第一个未被消费的产品的下标紧挨着,则说明没有 // 存储空间,如果没有存储空间而且生产者线程还在运行,则等待仓库释放产品 while(((rear+1)%CAPACITY == front) && producerRunning){ wait(); producerRunning = ((Producer)currentThread).isRunning(); } // 如果生产者线程已经停止了,则停止产品的存储。 if(!producerRunning){ return; } // 保存参数产品到仓库 products[rear] = product; System.out.println("Producer["+Thread.currentThread().getName()+"]storageProduct:"+product); // 将rear下标循环后移一位 rear = (rear+1)%CAPACITY; System.out.println("仓库中还没有被消费的产品数量:"+(rear+CAPACITY-front)%CAPACITY); notify(); } } }
package com.syc.examples.chapter8; public class TestProduct { public static void main(String[] args){ Warehouse warehouse = new Warehouse(10); // 建立一个仓库,容量为10 // 建立生产者和消费者 Producer producers1 = new Producer(warehouse,"producer-1"); Producer producers2 = new Producer(warehouse,"producer-2"); Producer producers3 = new Producer(warehouse,"producer-3"); Consumer consumers1 = new Consumer(warehouse,"consumer-1"); Consumer consumers2 = new Consumer(warehouse,"consumer-2"); Consumer consumers3 = new Consumer(warehouse,"consumer-3"); Consumer consumers4 = new Consumer(warehouse,"consumer-4"); // 启动生产者和消费者线程 producers1.start(); producers2.start(); consumers1.start(); producers3.start(); consumers2.start(); consumers3.start(); consumers4.start(); // 让生产者消费者程序预习1600ms try { Thread.sleep(1600); } catch (InterruptedException e) { e.printStackTrace(); } // 停止生产者和消费者的线程 producers1.stopProducer(); consumers1.stopConsumer(); producers2.stopProducer(); consumers2.stopConsumer(); producers3.stopProducer(); consumers3.stopConsumer(); consumers4.stopConsumer(); } }