日期:2014-05-17  浏览次数:20666 次

Windows下一个并发阻塞队列(BlockingQueue)

Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。

#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_

#include <queue>
#include <windows.h>
using namespace std;

template <typename T>
class BoundedBlockingQueue 
{ 
public: 
    BoundedBlockingQueue(int size) : maxSize(size) 
    {
        _lock = CreateMutex(NULL,false,NULL);
        _rsem = CreateSemaphore(NULL,0,size,NULL);
        _wsem = CreateSemaphore(NULL,size,size,NULL);
    } 
    ~BoundedBlockingQueue() 
    { 
        CloseHandle(_lock);
        CloseHandle(_rsem);
        CloseHandle(_wsem);
    } 
    void push(const T& data);
    T pop();
    bool empty()
    {
        WaitForSingleObject(_lock,INFINITE);
        bool is_empty = _array.empty();
        ReleaseMutex(_lock);
        return is_empty;
    }
private: 
    deque<T> _array;
    int maxSize;
    HANDLE _lock;
    HANDLE _rsem, _wsem;
};

template <typename T>
void BoundedBlockingQueue <T>::push(const T& value ) 
{ 
    WaitForSingleObject(_wsem,INFINITE);
    WaitForSingleObject(_lock,INFINITE);
    _array.push_back(value);
    ReleaseMutex(_lock);
    ReleaseSemaphore(_rsem,1,NULL);
}

template <typename T>
T BoundedBlockingQueue<T>::pop() 
{ 
    WaitForSingleObject(_rsem,INFINITE);
    WaitForSingleObject(_lock,INFINITE);
    T _temp = _array.front();
    _array.pop_front();
    ReleaseMutex(_lock);
    ReleaseSemaphore(_wsem,1,NULL);
    return _temp;
}

#endif

主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。

#include "BlockingQueue.h"
#include <windows.h>
#include <iostream>
using namespace std;

bool is_over = false;

DWORD WINAPI produce(LPVOID lppara)
{
    BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;

    while(1)
    {
        for(int i=1; i<=50; ++i)
        {
            queue->push(i);
            cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;
            Sleep(10); //producer is fast
        }
        is_over = true;
        break;
    }
    return NULL;
}

DWORD WINAPI consume(LPVOID lppara)
{
    BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;

    while(1)
    {
        int d = queue->pop();
        cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;
        //double check
        if(is_over && queue->empty())
        {
            cout<<"OVER!"<<endl;
            break;
        }
        Sleep(10); //consumer is slow
    }
    return NULL;
}

int main()
{
    DWORD write_data;
    DWORD read_data;
    DWORD read_data1;

    BoundedBlockingQueue<int> queue(20);

    //一个生产者、两个消费者
    if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)
        return -1;
    if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)
        return -1;
    if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)
        return -1;

    char ch;
    while(1)
    {
        ch = getchar(); //press "e" to exit
        if(ch == 'e') break;
    }

    printf("Program ends successfully\n");

    return 0;
}