日期:2014-05-16 浏览次数:20493 次
import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.RandomStringUtils; /** * @author zhaoqilong * @version 创建时间:2012-6-7 上午9:16:56 * */ public class Test { private static LinkedBlockingQueue<String> queue =new LinkedBlockingQueue<String>(); // 线程控制开关 private final CountDownLatch latch = new CountDownLatch(1); //的线程池 private final ExecutorService pool; //AtomicLong 计数 生产数量 private final AtomicLong output = new AtomicLong(0); //AtomicLong 计数 销售数量 private final AtomicLong sales = new AtomicLong(0); //是否停止线程 private final boolean clear; public Test(boolean clear){ this.pool = Executors.newCachedThreadPool(); this.clear=clear; } public void service() throws InterruptedException{ Saler a=new Saler(queue, sales, latch, clear); pool.submit(a); Worker w=new Worker(queue, output, latch); pool.submit(w); latch.countDown(); } public static void main(String[] args) { Test t=new Test(false); try { t.service(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Saler implements Runnable{ private final LinkedBlockingQueue<String> queue; private final AtomicLong sales; private final CountDownLatch latch; private final boolean clear; public Saler(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear){ this.queue = queue; this.sales = sales; this.latch = latch; this.clear = clear; } public void run() { try { latch.await(); // 放闸之前老实的等待着 for (;;) { sale(); Thread.sleep(500); } }catch (InterruptedException e) { if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程 cleanWarehouse(); } else { System.out.println("Seller Thread will be interrupted..."); } } } public void sale(){ System.out.println("==取take="); try { String item = queue.poll(50, TimeUnit.MILLISECONDS); System.out.println(item); if(item!=null){ sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数 } } catch (InterruptedException e) { e.printStackTrace(); } } /** * 销售完队列剩余的产品 */ private void cleanWarehouse() { try { while (queue.size() > 0) { sale(); } } catch (Exception ex) { System.out.println("Seller Thread will be interrupted..."); } } } /** * 生产者 * @author Administrator * */ class Worker implements Runnable{ private LinkedBlockingQueue<String> queue; private CountDownLatch latch; private AtomicLong output; public Worker(){ } public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch){ this.queue=queue; this.latch=latch; this.output=output; } public void run() { try { latch.await(); // 线程等待 for (;;) { work(); Thread.sleep(100); } }catch (InterruptedException e) { System.out.println("Worker thread will be interrupted..."); } } /** * 工作 */ public void work(){ try { String product=RandomStringUtils.randomAscii(3); boolean success=queue.offer(product, 100, TimeUnit.MILLISECONDS); if(success){ output.incrementAndGet();// 可以声明long型的参数获得返回值,作为日志的参数 } } catch (InterruptedException e) { e.printStac