You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitlib.pub/MITLib/Utility/ConcurrentQueue.h

216 lines
6.0 KiB
C++

#pragma once
#include <afx.h>
#include <list>
#include "thread.h"
#include "Utility.h"
#define DEFAULT_CAPACITY 1000
template <typename T> class CConcurrentQueue
{
public:
CConcurrentQueue(size_t capacity = DEFAULT_CAPACITY) : m_capacity(capacity)
{
m_spaceAvailableEvent.Set();
}
// Destroy the queue and all it's content
~CConcurrentQueue(void)
{
while (!m_queue.empty())
{
m_queue.pop_front();
}
}
// Pushes object T when space is available
// Throws std::exception on timeout or when interrupted
bool Push(T item, int milliSeconds = INFINITE)
{
bool bPushedItem = false;
CSingleLock lock(&m_mutex);
while (!bPushedItem)
{
try
{
// Wait for space to be available
m_spaceAvailableEvent.WaitEvtEx(milliSeconds);
}
catch (std::exception &e)
{
printf("Push failed: %s\n", e.what());
throw;
}
// wait for critical section to become available
lock.Lock();
// inside critical section
{
size_t size = m_queue.size();
// Make sure that nobody else that was waiting pushed first
if (size < m_capacity)
{
m_queue.push_back(item);
bPushedItem = true;
if (size == 0)
{
// Queue was empty, now there is one item
m_dataAvailableEvent.Set();
}
size++;
if (size >= m_capacity)
{
// Queue is now full
m_spaceAvailableEvent.Reset();
}
}
}
lock.Unlock();
}
return true;
}
// Pushes object T when space is available
// Throws std::exception on timeout or when interrupted
bool PushFront(T item, int milliSeconds = INFINITE)
{
bool bPushedItem = false;
CSingleLock lock(&m_mutex);
while (!bPushedItem)
{
try
{
// Wait for space to be available
m_spaceAvailableEvent.WaitEvtEx(milliSeconds);
}
catch (std::exception &e)
{
printf("Push failed: %s\n", e.what());
throw;
}
// wait for critical section to become available
lock.Lock();
// inside critical section
{
size_t size = m_queue.size();
// Make sure that nobody else that was waiting pushed first
if (size < m_capacity)
{
m_queue.push_front(item);
bPushedItem = true;
if (size == 0)
{
// Queue was empty, now there is one item
m_dataAvailableEvent.Set();
}
size++;
if (size >= m_capacity)
{
// Queue is now full
m_spaceAvailableEvent.Reset();
}
}
}
lock.Unlock();
}
return true;
}
// Returns object T when available
// Throws std::exception on timeout or when interrupted
T Pop(int milliSeconds = INFINITE)
{
T item;
PopEx(item, milliSeconds, true);
return item;
}
// Returns object T when available
// Throws std::exception on timeout or when interrupted
bool Pop(T &item, int milliSeconds = INFINITE)
{
return PopEx(item, milliSeconds, false);
}
int Size()
{
int size;
// wait for mutex to become available
CSingleLock lock(&m_mutex, true);
// inside critical section
{
size = (int)m_queue.size();
}
return size;
}
private:
std::list<T> m_queue;
CMutex m_mutex;
CEvt m_dataAvailableEvent;
CEvt m_spaceAvailableEvent;
size_t m_capacity;
bool PopEx(T &item, int milliSeconds = INFINITE, bool bThrowException = true)
{
CHighResPerformanceCounter counter;
counter.StartCnt();
long remaining = milliSeconds;
CSingleLock lock(&m_mutex);
while (true)
{
// wait for event to be signaled that there is data
if (bThrowException)
{
m_dataAvailableEvent.WaitEvtEx(remaining);
}
else if (!m_dataAvailableEvent.WaitEvt(remaining))
{
return false;
}
// wait for mutex to become available
lock.Lock();
// inside critical section
{
// try to deque something - it<69>s possible that the queue is empty because another
// thread pre-empted before us and got the last item in the queue
size_t size = m_queue.size();
if (size > 0)
{
item = m_queue.front();
m_queue.pop_front();
if (size == 1)
{
m_dataAvailableEvent.Reset();
}
if (size <= m_capacity)
{
// There was no space, now there is
m_spaceAvailableEvent.Set();
}
return true;
}
}
lock.Unlock();
remaining = counter.GetTimeRemaining(milliSeconds);
}
}
};