#pragma once #include #include #include "thread.h" #include "Utility.h" #define DEFAULT_CAPACITY 1000 template class CConcurrentQueue { public: CConcurrentQueue(size_t capacity = DEFAULT_CAPACITY) : m_capacity(capacity) { m_spaceAvailableEvent.Set(); } // Destroy the queue and all it's content ~CConcurrentQueue(void) { while (!m_queue.empty()) { m_queue.pop_front(); } } // Pushes object T when space is available // Throws std::exception on timeout or when interrupted bool Push(T item, int milliSeconds = INFINITE) { bool bPushedItem = false; CSingleLock lock(&m_mutex); while (!bPushedItem) { try { // Wait for space to be available m_spaceAvailableEvent.WaitEvtEx(milliSeconds); } catch (std::exception &e) { printf("Push failed: %s\n", e.what()); throw; } // wait for critical section to become available lock.Lock(); // inside critical section { size_t size = m_queue.size(); // Make sure that nobody else that was waiting pushed first if (size < m_capacity) { m_queue.push_back(item); bPushedItem = true; if (size == 0) { // Queue was empty, now there is one item m_dataAvailableEvent.Set(); } size++; if (size >= m_capacity) { // Queue is now full m_spaceAvailableEvent.Reset(); } } } lock.Unlock(); } return true; } // Pushes object T when space is available // Throws std::exception on timeout or when interrupted bool PushFront(T item, int milliSeconds = INFINITE) { bool bPushedItem = false; CSingleLock lock(&m_mutex); while (!bPushedItem) { try { // Wait for space to be available m_spaceAvailableEvent.WaitEvtEx(milliSeconds); } catch (std::exception &e) { printf("Push failed: %s\n", e.what()); throw; } // wait for critical section to become available lock.Lock(); // inside critical section { size_t size = m_queue.size(); // Make sure that nobody else that was waiting pushed first if (size < m_capacity) { m_queue.push_front(item); bPushedItem = true; if (size == 0) { // Queue was empty, now there is one item m_dataAvailableEvent.Set(); } size++; if (size >= m_capacity) { // Queue is now full m_spaceAvailableEvent.Reset(); } } } lock.Unlock(); } return true; } // Returns object T when available // Throws std::exception on timeout or when interrupted T Pop(int milliSeconds = INFINITE) { T item; PopEx(item, milliSeconds, true); return item; } // Returns object T when available // Throws std::exception on timeout or when interrupted bool Pop(T &item, int milliSeconds = INFINITE) { return PopEx(item, milliSeconds, false); } int Size() { int size; // wait for mutex to become available CSingleLock lock(&m_mutex, true); // inside critical section { size = m_queue.size(); } return size; } private: std::list m_queue; CMutex m_mutex; CEvt m_dataAvailableEvent; CEvt m_spaceAvailableEvent; size_t m_capacity; bool PopEx(T &item, int milliSeconds = INFINITE, bool bThrowException = true) { CHighResPerformanceCounter counter; counter.StartCnt(); long remaining = milliSeconds; CSingleLock lock(&m_mutex); while (true) { // wait for event to be signaled that there is data if (bThrowException) { m_dataAvailableEvent.WaitEvtEx(remaining); } else if (!m_dataAvailableEvent.WaitEvt(remaining)) { return false; } // wait for mutex to become available lock.Lock(); // inside critical section { // try to deque something - it’s possible that the queue is empty because another // thread pre-empted before us and got the last item in the queue size_t size = m_queue.size(); if (size > 0) { item = m_queue.front(); m_queue.pop_front(); if (size == 1) { m_dataAvailableEvent.Reset(); } if (size <= m_capacity) { // There was no space, now there is m_spaceAvailableEvent.Set(); } return true; } } lock.Unlock(); remaining = counter.GetTimeRemaining(milliSeconds); } } };