日期:2014-05-20  浏览次数:20644 次

问一个关于多线程的问题
场景是这样的:
需要监听一个任务队列,多个线程往一个集合(考虑到同步问题,使用了java.util.concurrent.ConcurrentLinkedQueue)里面放置任务编号,有另外一个线程去监听这个集合,拿到这些任务编号去做相关的业务。

现在监听这儿有点疑问,目前采用的是死循环不断地poll任务编号,一旦有几次为null,则sleep一定时间,以降低CPU使用率,然后再起来重复取。。
实际应用中,并发量会比较大,但可能偶尔也会出现系统闲置而睡眠的情况吧。

想问下大家,这个监听功能还有什么更高效的办法来实现吗?wait和notify在这种情况下使用靠谱吗?

谢谢!

------解决方案--------------------
不知道具体的需求是什么,至少“死循环不断地poll任务编号”是不可取的,想办法用消息机制取代。
生产消费模型中,可以主动消费,也可以被动消费,看具体情况了。
------解决方案--------------------
使用消息机制比较靠谱
------解决方案--------------------
应该使用 java.util.concurrent.ArrayBlockingQueue/LinkedBlockingQueue来实现


多个线程往一个集合放入东西时只需要queue.put(id);


另外一个线程去监听这个集合只需要
while(true) {
String id = queue.take();
//do something
}

就可以了,这两个类都已经非常好的实现了wait/notify机制,无须你自己处理了
------解决方案--------------------
4楼正解,jdk1.5提供了比较强大的线程功能,blockqueue极大提高了生产、消费模式开发效率。楼主可以试试
------解决方案--------------------
使用生产者/消费者模型,也就是wait和notify实现同步
------解决方案--------------------
mark一下
------解决方案--------------------
新来乍到 顶一下
------解决方案--------------------
实现多线程池,和监听模式完成.那样的话不用死循环去查看, 速度上好了.
------解决方案--------------------
实际生产关键是考虑队列堵塞的问题!

并发不是问题!!一个队列不行,可以用多个队列嘛!!
一台机器不行,可以用多台机器嘛!!
负载均衡,反向代理 解决这种并发问题是小试牛刀!!

重中之重还是队列堵塞的问题!
前面堵住,后面出不来!!或队列中积累的数量线性增长,内存消耗完系统就当机了!!
堵塞带来的联动悲观结果,就是处理器不停的扫描队列,造成资源消耗尽,灯火熄灭!!
同时竞争扫描带来线程死锁!!
总之,恶性循环!!一塌糊涂!!!哈哈哈

因为堵塞不只是简单的PUT,OUT所能解决的,是关系到整个分布式事务的大问题,而且是生产环境无法逾越的,必须要解决的问题!!
谁欺骗它,生产上就会带来恶劣的报复给你!!

这是非常高级的技术问题,预知道详细问题解答,请联系QQ:871933435


------解决方案--------------------
顶一哈,加油~~
------解决方案--------------------
我做J2ME 手机UI开发的时候,用到线程池,就自己写一个来,也就是用到了wait和notify实现同步。
------解决方案--------------------
顶下
------解决方案--------------------
嗯,死循环肯定是需要的,但是需要退出开关。
并且,可以在每次循环开始的时候现取一个编号,没必要生成一大堆编号扔到队列中吧。
------解决方案--------------------
up
------解决方案--------------------
顶一下
------解决方案--------------------
使用生产者/消费者模型,也就是wait和notify实现同步
------解决方案--------------------
UP
------解决方案--------------------
探讨
4楼正解,jdk1.5提供了比较强大的线程功能,blockqueue极大提高了生产、消费模式开发效率。楼主可以试试

------解决方案--------------------
现在监听这儿有点疑问,目前采用的是死循环不断地poll任务编号,一旦有几次为null,则sleep一定时间,以降低CPU使用率,然后再起来重复取。。 
实际应用中,并发量会比较大,但可能偶尔也会出现系统闲置而睡眠的情况吧。 


我一般用的是wait这样才是真正的节约cpu,然后用notify或者notifyAll来实现
------解决方案--------------------
Java code

package chuangsi_0510.thread.timer;

//生产者消费者问题
public class TestPC {

    /**
     * @param args
     */
    public static void main(String[] args) {
         Queue queue = new Queue(20) ;
         
         Producer p1 = new Producer(queue) ;
         Producer p2 = new Producer(queue) ;
         Producer p3 = new Producer(queue) ;

         Consumer c1 = new Consumer(queue) ;
         Consumer c2 = new Consumer(queue) ;
         Consumer c3 = new Consumer(queue) ;
         Consumer c4 = new Consumer(queue) ;
         
         
         Thread tp1 = new Thread(p1) ;
         Thread tp2 = new Thread(p2) ;
         Thread tp3 = new Thread(p3) ;
    
         Thread tc1 = new Thread(c1) ;
         Thread tc2 = new Thread(c1) ;
         Thread tc3 = new Thread(c1) ;
         Thread tc4 = new Thread(c1) ;
    
         tp1.start() ;
         tp2.start() ;
         tp3.start() ;
         tc1.start() ;
         tc2.start() ;
         tc3.start() ;
         tc4.start() ; 
    }

}


class Item {
    int id ;
    String name; 
    public Item(int id , String name ) {
        this.id = id ;
        this.name = name ;
    }
    
}

class Queue {
    
    public Object[] items ;
    
    private int capacity ;
    private int in ; //下一个可以放商品的位置
    private int out ;//下一个可以取商品的位置
    
    private int size ;
    
    public Queue(int capacity) {
        this.capacity = capacity ;
        items = new Object[capacity] ;
        size = 0 ;
        in = out = 0 ;
    }
    
    public synchronized void insert(Object item) {
        while (size == capacity) {
            System.out.println("等待消费者把商品消费掉!");
            
             try {                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                this.wait() ;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
        this.notify() ;
        items[in] = item ;
        in = (in+1) % capacity ;
        size++ ;
        
        
    }
    
    public synchronized   Object remove() {
        while (size == 0 ) {
            System.out.println("等待生产者生产产品!");
            try {
                this.wait() ;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        this.notify() ;
        
        Object o = items[out] ;
        out = (out + 1) % capacity ;
        size-- ;
        return o ;
    }
    
}


class Producer implements Runnable {
    
    private Queue queue ;
    
    public Producer(Queue queue) {
        this.queue = queue ;
    }
    
    public void run() {
        
        for (int i = 0 ; i < 30 ; i++ ) {
            Item item = new Item(i,Thread.currentThread().getName() + "生产的商品" +i) ;
            queue.insert(item) ;
            
            try {
                Thread.sleep(50) ;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
        }
        
    }

}

class Consumer implements Runnable {
    private Queue queue ;
    
    public Consumer(Queue queue) {
        this.queue = queue ;
    }
    
    public void run() {
        
        for (int i = 0 ; i < 30 ; i++ ) {
            Item item = (Item)queue.remove() ;
            try {
                Thread.sleep( 20) ;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            System.out.println(Thread.currentThread().getName() + "消费了" + item.name) ;
        }
    }    
}