diff options
author | kurtis.heimerl <kurtis.heimerl@19bc5d8c-e614-43d4-8b26-e1612bc8e597> | 2013-05-31 21:47:25 +0000 |
---|---|---|
committer | kurtis.heimerl <kurtis.heimerl@19bc5d8c-e614-43d4-8b26-e1612bc8e597> | 2013-05-31 21:47:25 +0000 |
commit | 5a87247fdf2768a6408e0b87c210cebda85bc996 (patch) | |
tree | b538e7e42f8a7ba6c53e1b0bc22bfb359b1e0ef9 /CommonLibs/Interthread.h | |
parent | bec41039bf2ec07c04a6e8b0b586b085ab9cd74c (diff) |
syncing commonlibs with Many thanks to Michael Iedema for these patches, makes config a lot better.
git-svn-id: http://wush.net/svn/range/software/public/openbts/trunk@5655 19bc5d8c-e614-43d4-8b26-e1612bc8e597
Diffstat (limited to 'CommonLibs/Interthread.h')
-rw-r--r-- | CommonLibs/Interthread.h | 156 |
1 files changed, 153 insertions, 3 deletions
diff --git a/CommonLibs/Interthread.h b/CommonLibs/Interthread.h index 023ac14..42e6f7f 100644 --- a/CommonLibs/Interthread.h +++ b/CommonLibs/Interthread.h @@ -42,15 +42,21 @@ /** Pointer FIFO for interthread operations. */ -template <class T> class InterthreadQueue { +// (pat) The elements in the queue are type T*, and +// the Fifo class implements the underlying queue. +// The default is class PointerFIFO, which does not place any restrictions on the type of T, +// and is implemented by allocating auxilliary structures for the queue, +// or SingleLinkedList, which implements the queue using an internal pointer in type T, +// which must implement the functional interface of class SingleLinkListNode, +// namely: functions T*next() and void setNext(T*). +template <class T, class Fifo=PointerFIFO> class InterthreadQueue { protected: - PointerFIFO mQ; + Fifo mQ; mutable Mutex mLock; mutable Signal mWriteSignal; - public: /** Delete contents. */ @@ -78,6 +84,12 @@ template <class T> class InterthreadQueue { return mQ.size(); } + size_t totalSize() const // pat added + { + ScopedLock lock(mLock); + return mQ.totalSize(); + } + /** Blocking read. @return Pointer to object (will not be NULL). @@ -93,6 +105,13 @@ template <class T> class InterthreadQueue { return retVal; } + /** Non-blocking peek at the first element; returns NULL if empty. */ + T* front() + { + ScopedLock lock(mLock); + return (T*) mQ.front(); + } + /** Blocking read with a timeout. @param timeout The read timeout in ms. @@ -127,7 +146,132 @@ template <class T> class InterthreadQueue { mWriteSignal.signal(); } + /** Non-block write to the front of the queue. */ + void write_front(T* val) // pat added + { + ScopedLock lock(mLock); + mQ.push_front(val); + mWriteSignal.signal(); + } +}; + +// (pat) Identical to above but with the threading problem fixed. +template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 { + + protected: + + Fifo mQ; + mutable Mutex mLock; + mutable Signal mWriteSignal; + + public: + + /** Delete contents. */ + void clear() + { + ScopedLock lock(mLock); + while (mQ.size()>0) delete (T*)mQ.get(); + } + + /** Empty the queue, but don't delete. */ + void flushNoDelete() + { + ScopedLock lock(mLock); + while (mQ.size()>0) mQ.get(); + } + + + ~InterthreadQueue2() + { clear(); } + + size_t size() const + { + ScopedLock lock(mLock); + return mQ.size(); + } + + size_t totalSize() const // pat added + { + ScopedLock lock(mLock); + return mQ.totalSize(); + } + + /** + Blocking read. + @return Pointer to object (will not be NULL). + */ + T* read() + { + ScopedLock lock(mLock); + T* retVal = (T*)mQ.get(); + while (retVal==NULL) { + mWriteSignal.wait(mLock); + retVal = (T*)mQ.get(); + } + return retVal; + } + + /** Non-blocking peek at the first element; returns NULL if empty. */ + T* front() + { + ScopedLock lock(mLock); + return (T*) mQ.front(); + } + + /** + Blocking read with a timeout. + @param timeout The read timeout in ms. + @return Pointer to object or NULL on timeout. + */ + T* read(unsigned timeout) + { + if (timeout==0) return readNoBlock(); + Timeval waitTime(timeout); + ScopedLock lock(mLock); + while ((mQ.size()==0) && (!waitTime.passed())) + mWriteSignal.wait(mLock,waitTime.remaining()); + T* retVal = (T*)mQ.get(); + return retVal; + } + + /** + Non-blocking read. + @return Pointer to object or NULL if FIFO is empty. + */ + T* readNoBlock() + { + ScopedLock lock(mLock); + return (T*)mQ.get(); + } + + /** Non-blocking write. */ + void write(T* val) + { + // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition. + // This is an implicit requirement of pthread_cond_wait() called from signal(). + // If you do not do that, the InterthreadQueue read() function cannot start + // because the mutex is still locked by the thread calling the write(), + // so the read() thread yields its immediate execution opportunity. + // This recurs (and the InterthreadQueue fills up with data) + // until the read thread's accumulated temporary priority causes it to + // get a second pre-emptive activation over the writing thread, + // resulting in bursts of activity by the read thread. + { ScopedLock lock(mLock); + mQ.put(val); + } + mWriteSignal.signal(); + } + + /** Non-block write to the front of the queue. */ + void write_front(T* val) // pat added + { + // (pat) See comments above. + { ScopedLock lock(mLock); + mQ.push_front(val); + } + mWriteSignal.signal(); + } }; @@ -214,12 +358,17 @@ template <class T> class InterthreadQueueWithWait { /** Non-blocking write. */ void write(T* val) { + // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. ScopedLock lock(mLock); mQ.put(val); mWriteSignal.signal(); } /** Wait until the queue falls below a low water mark. */ + // (pat) This function suffers from the same problem as documented + // at InterthreadQueue.write(), but I am not fixing it because I cannot test it. + // The caller of this function will eventually get to run, just not immediately + // after the mReadSignal condition is fulfilled. void wait(size_t sz=0) { ScopedLock lock(mLock); @@ -484,6 +633,7 @@ template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > cl /** Non-blocking write. */ void write(T* val) { + // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. ScopedLock lock(mLock); mQ.push(val); mWriteSignal.signal(); |