aboutsummaryrefslogtreecommitdiffstats
path: root/CommonLibs/Interthread.h
diff options
context:
space:
mode:
Diffstat (limited to 'CommonLibs/Interthread.h')
-rw-r--r--CommonLibs/Interthread.h156
1 files changed, 153 insertions, 3 deletions
diff --git a/CommonLibs/Interthread.h b/CommonLibs/Interthread.h
index 023ac14..42e6f7f 100644
--- a/CommonLibs/Interthread.h
+++ b/CommonLibs/Interthread.h
@@ -42,15 +42,21 @@
/** Pointer FIFO for interthread operations. */
-template <class T> class InterthreadQueue {
+// (pat) The elements in the queue are type T*, and
+// the Fifo class implements the underlying queue.
+// The default is class PointerFIFO, which does not place any restrictions on the type of T,
+// and is implemented by allocating auxilliary structures for the queue,
+// or SingleLinkedList, which implements the queue using an internal pointer in type T,
+// which must implement the functional interface of class SingleLinkListNode,
+// namely: functions T*next() and void setNext(T*).
+template <class T, class Fifo=PointerFIFO> class InterthreadQueue {
protected:
- PointerFIFO mQ;
+ Fifo mQ;
mutable Mutex mLock;
mutable Signal mWriteSignal;
-
public:
/** Delete contents. */
@@ -78,6 +84,12 @@ template <class T> class InterthreadQueue {
return mQ.size();
}
+ size_t totalSize() const // pat added
+ {
+ ScopedLock lock(mLock);
+ return mQ.totalSize();
+ }
+
/**
Blocking read.
@return Pointer to object (will not be NULL).
@@ -93,6 +105,13 @@ template <class T> class InterthreadQueue {
return retVal;
}
+ /** Non-blocking peek at the first element; returns NULL if empty. */
+ T* front()
+ {
+ ScopedLock lock(mLock);
+ return (T*) mQ.front();
+ }
+
/**
Blocking read with a timeout.
@param timeout The read timeout in ms.
@@ -127,7 +146,132 @@ template <class T> class InterthreadQueue {
mWriteSignal.signal();
}
+ /** Non-block write to the front of the queue. */
+ void write_front(T* val) // pat added
+ {
+ ScopedLock lock(mLock);
+ mQ.push_front(val);
+ mWriteSignal.signal();
+ }
+};
+
+// (pat) Identical to above but with the threading problem fixed.
+template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 {
+
+ protected:
+
+ Fifo mQ;
+ mutable Mutex mLock;
+ mutable Signal mWriteSignal;
+
+ public:
+
+ /** Delete contents. */
+ void clear()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) delete (T*)mQ.get();
+ }
+
+ /** Empty the queue, but don't delete. */
+ void flushNoDelete()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) mQ.get();
+ }
+
+
+ ~InterthreadQueue2()
+ { clear(); }
+
+ size_t size() const
+ {
+ ScopedLock lock(mLock);
+ return mQ.size();
+ }
+
+ size_t totalSize() const // pat added
+ {
+ ScopedLock lock(mLock);
+ return mQ.totalSize();
+ }
+
+ /**
+ Blocking read.
+ @return Pointer to object (will not be NULL).
+ */
+ T* read()
+ {
+ ScopedLock lock(mLock);
+ T* retVal = (T*)mQ.get();
+ while (retVal==NULL) {
+ mWriteSignal.wait(mLock);
+ retVal = (T*)mQ.get();
+ }
+ return retVal;
+ }
+
+ /** Non-blocking peek at the first element; returns NULL if empty. */
+ T* front()
+ {
+ ScopedLock lock(mLock);
+ return (T*) mQ.front();
+ }
+
+ /**
+ Blocking read with a timeout.
+ @param timeout The read timeout in ms.
+ @return Pointer to object or NULL on timeout.
+ */
+ T* read(unsigned timeout)
+ {
+ if (timeout==0) return readNoBlock();
+ Timeval waitTime(timeout);
+ ScopedLock lock(mLock);
+ while ((mQ.size()==0) && (!waitTime.passed()))
+ mWriteSignal.wait(mLock,waitTime.remaining());
+ T* retVal = (T*)mQ.get();
+ return retVal;
+ }
+
+ /**
+ Non-blocking read.
+ @return Pointer to object or NULL if FIFO is empty.
+ */
+ T* readNoBlock()
+ {
+ ScopedLock lock(mLock);
+ return (T*)mQ.get();
+ }
+
+ /** Non-blocking write. */
+ void write(T* val)
+ {
+ // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition.
+ // This is an implicit requirement of pthread_cond_wait() called from signal().
+ // If you do not do that, the InterthreadQueue read() function cannot start
+ // because the mutex is still locked by the thread calling the write(),
+ // so the read() thread yields its immediate execution opportunity.
+ // This recurs (and the InterthreadQueue fills up with data)
+ // until the read thread's accumulated temporary priority causes it to
+ // get a second pre-emptive activation over the writing thread,
+ // resulting in bursts of activity by the read thread.
+ { ScopedLock lock(mLock);
+ mQ.put(val);
+ }
+ mWriteSignal.signal();
+ }
+
+ /** Non-block write to the front of the queue. */
+ void write_front(T* val) // pat added
+ {
+ // (pat) See comments above.
+ { ScopedLock lock(mLock);
+ mQ.push_front(val);
+ }
+ mWriteSignal.signal();
+ }
};
@@ -214,12 +358,17 @@ template <class T> class InterthreadQueueWithWait {
/** Non-blocking write. */
void write(T* val)
{
+ // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
ScopedLock lock(mLock);
mQ.put(val);
mWriteSignal.signal();
}
/** Wait until the queue falls below a low water mark. */
+ // (pat) This function suffers from the same problem as documented
+ // at InterthreadQueue.write(), but I am not fixing it because I cannot test it.
+ // The caller of this function will eventually get to run, just not immediately
+ // after the mReadSignal condition is fulfilled.
void wait(size_t sz=0)
{
ScopedLock lock(mLock);
@@ -484,6 +633,7 @@ template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > cl
/** Non-blocking write. */
void write(T* val)
{
+ // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
ScopedLock lock(mLock);
mQ.push(val);
mWriteSignal.signal();