/* * Copyright 2008, 2011 Free Software Foundation, Inc. * * This software is distributed under the terms of the GNU Affero Public License. * See the COPYING file in the main directory for details. * * This use of this software may be subject to additional restrictions. * See the LEGAL file in the main directory for details. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ #ifndef INTERTHREAD_H #define INTERTHREAD_H #include "Timeval.h" #include "Threads.h" #include "LinkedLists.h" #include #include #include /**@defgroup Templates for interthread mechanisms. */ //@{ /** Pointer FIFO for interthread operations. */ // (pat) The elements in the queue are type T*, and // the Fifo class implements the underlying queue. // The default is class PointerFIFO, which does not place any restrictions on the type of T, // and is implemented by allocating auxilliary structures for the queue, // or SingleLinkedList, which implements the queue using an internal pointer in type T, // which must implement the functional interface of class SingleLinkListNode, // namely: functions T*next() and void setNext(T*). template class InterthreadQueue { protected: Fifo mQ; mutable Mutex mLock; mutable Signal mWriteSignal; public: /** Delete contents. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) delete (T*)mQ.get(); } /** Empty the queue, but don't delete. */ void flushNoDelete() { ScopedLock lock(mLock); while (mQ.size()>0) mQ.get(); } ~InterthreadQueue() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } size_t totalSize() const // pat added { ScopedLock lock(mLock); return mQ.totalSize(); } /** Blocking read. @return Pointer to object (will not be NULL). */ T* read() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); while (retVal==NULL) { mWriteSignal.wait(mLock); retVal = (T*)mQ.get(); } return retVal; } /** Non-blocking peek at the first element; returns NULL if empty. */ T* front() { ScopedLock lock(mLock); return (T*) mQ.front(); } /** Blocking read with a timeout. @param timeout The read timeout in ms. @return Pointer to object or NULL on timeout. */ T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(mLock); while ((mQ.size()==0) && (!waitTime.passed())) mWriteSignal.wait(mLock,waitTime.remaining()); T* retVal = (T*)mQ.get(); return retVal; } /** Non-blocking read. @return Pointer to object or NULL if FIFO is empty. */ T* readNoBlock() { ScopedLock lock(mLock); return (T*)mQ.get(); } /** Non-blocking write. */ void write(T* val) { ScopedLock lock(mLock); mQ.put(val); mWriteSignal.signal(); } /** Non-block write to the front of the queue. */ void write_front(T* val) // pat added { ScopedLock lock(mLock); mQ.push_front(val); mWriteSignal.signal(); } }; // (pat) Identical to above but with the threading problem fixed. template class InterthreadQueue2 { protected: Fifo mQ; mutable Mutex mLock; mutable Signal mWriteSignal; public: /** Delete contents. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) delete (T*)mQ.get(); } /** Empty the queue, but don't delete. */ void flushNoDelete() { ScopedLock lock(mLock); while (mQ.size()>0) mQ.get(); } ~InterthreadQueue2() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } size_t totalSize() const // pat added { ScopedLock lock(mLock); return mQ.totalSize(); } /** Blocking read. @return Pointer to object (will not be NULL). */ T* read() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); while (retVal==NULL) { mWriteSignal.wait(mLock); retVal = (T*)mQ.get(); } return retVal; } /** Non-blocking peek at the first element; returns NULL if empty. */ T* front() { ScopedLock lock(mLock); return (T*) mQ.front(); } /** Blocking read with a timeout. @param timeout The read timeout in ms. @return Pointer to object or NULL on timeout. */ T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(mLock); while ((mQ.size()==0) && (!waitTime.passed())) mWriteSignal.wait(mLock,waitTime.remaining()); T* retVal = (T*)mQ.get(); return retVal; } /** Non-blocking read. @return Pointer to object or NULL if FIFO is empty. */ T* readNoBlock() { ScopedLock lock(mLock); return (T*)mQ.get(); } /** Non-blocking write. */ void write(T* val) { // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition. // This is an implicit requirement of pthread_cond_wait() called from signal(). // If you do not do that, the InterthreadQueue read() function cannot start // because the mutex is still locked by the thread calling the write(), // so the read() thread yields its immediate execution opportunity. // This recurs (and the InterthreadQueue fills up with data) // until the read thread's accumulated temporary priority causes it to // get a second pre-emptive activation over the writing thread, // resulting in bursts of activity by the read thread. { ScopedLock lock(mLock); mQ.put(val); } mWriteSignal.signal(); } /** Non-block write to the front of the queue. */ void write_front(T* val) // pat added { // (pat) See comments above. { ScopedLock lock(mLock); mQ.push_front(val); } mWriteSignal.signal(); } }; /** Pointer FIFO for interthread operations. */ template class InterthreadQueueWithWait { protected: PointerFIFO mQ; mutable Mutex mLock; mutable Signal mWriteSignal; mutable Signal mReadSignal; virtual void freeElement(T* element) const { delete element; }; public: /** Delete contents. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) freeElement((T*)mQ.get()); mReadSignal.signal(); } virtual ~InterthreadQueueWithWait() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } /** Blocking read. @return Pointer to object (will not be NULL). */ T* read() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); while (retVal==NULL) { mWriteSignal.wait(mLock); retVal = (T*)mQ.get(); } mReadSignal.signal(); return retVal; } /** Blocking read with a timeout. @param timeout The read timeout in ms. @return Pointer to object or NULL on timeout. */ T* read(unsigned timeout) { if (timeout==0) return readNoBlock(); Timeval waitTime(timeout); ScopedLock lock(mLock); while ((mQ.size()==0) && (!waitTime.passed())) mWriteSignal.wait(mLock,waitTime.remaining()); T* retVal = (T*)mQ.get(); if (retVal!=NULL) mReadSignal.signal(); return retVal; } /** Non-blocking read. @return Pointer to object or NULL if FIFO is empty. */ T* readNoBlock() { ScopedLock lock(mLock); T* retVal = (T*)mQ.get(); if (retVal!=NULL) mReadSignal.signal(); return retVal; } /** Non-blocking write. */ void write(T* val) { // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. ScopedLock lock(mLock); mQ.put(val); mWriteSignal.signal(); } /** Wait until the queue falls below a low water mark. */ // (pat) This function suffers from the same problem as documented // at InterthreadQueue.write(), but I am not fixing it because I cannot test it. // The caller of this function will eventually get to run, just not immediately // after the mReadSignal condition is fulfilled. void wait(size_t sz=0) { ScopedLock lock(mLock); while (mQ.size()>sz) mReadSignal.wait(mLock); } }; /** Thread-safe map of pointers to class D, keyed by class K. */ template class InterthreadMap { protected: typedef std::map Map; Map mMap; mutable Mutex mLock; Signal mWriteSignal; public: void clear() { // Delete everything in the map. ScopedLock lock(mLock); typename Map::iterator iter = mMap.begin(); while (iter != mMap.end()) { delete iter->second; ++iter; } mMap.clear(); } ~InterthreadMap() { clear(); } /** Non-blocking write. @param key The index to write to. @param wData Pointer to data, not to be deleted until removed from the map. */ void write(const K &key, D * wData) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); if (iter!=mMap.end()) { delete iter->second; iter->second = wData; } else { mMap[key] = wData; } mWriteSignal.broadcast(); } /** Non-blocking read with element removal. @param key Key to read from. @return Pointer at key or NULL if key not found, to be deleted by caller. */ D* getNoBlock(const K& key) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); if (iter==mMap.end()) return NULL; D* retVal = iter->second; mMap.erase(iter); return retVal; } /** Blocking read with a timeout and element removal. @param key The key to read from. @param timeout The blocking timeout in ms. @return Pointer at key or NULL on timeout, to be deleted by caller. */ D* get(const K &key, unsigned timeout) { if (timeout==0) return getNoBlock(key); Timeval waitTime(timeout); ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); while ((iter==mMap.end()) && (!waitTime.passed())) { mWriteSignal.wait(mLock,waitTime.remaining()); iter = mMap.find(key); } if (iter==mMap.end()) return NULL; D* retVal = iter->second; mMap.erase(iter); return retVal; } /** Blocking read with and element removal. @param key The key to read from. @return Pointer at key, to be deleted by caller. */ D* get(const K &key) { ScopedLock lock(mLock); typename Map::iterator iter = mMap.find(key); while (iter==mMap.end()) { mWriteSignal.wait(mLock); iter = mMap.find(key); } D* retVal = iter->second; mMap.erase(iter); return retVal; } /** Remove an entry and delete it. @param key The key of the entry to delete. @return True if it was actually found and deleted. */ bool remove(const K &key ) { D* val = getNoBlock(key); if (!val) return false; delete val; return true; } /** Non-blocking read. @param key Key to read from. @return Pointer at key or NULL if key not found. */ D* readNoBlock(const K& key) const { D* retVal=NULL; ScopedLock lock(mLock); typename Map::const_iterator iter = mMap.find(key); if (iter!=mMap.end()) retVal = iter->second; return retVal; } /** Blocking read with a timeout. @param key The key to read from. @param timeout The blocking timeout in ms. @return Pointer at key or NULL on timeout. */ D* read(const K &key, unsigned timeout) const { if (timeout==0) return readNoBlock(key); ScopedLock lock(mLock); Timeval waitTime(timeout); typename Map::const_iterator iter = mMap.find(key); while ((iter==mMap.end()) && (!waitTime.passed())) { mWriteSignal.wait(mLock,waitTime.remaining()); iter = mMap.find(key); } if (iter==mMap.end()) return NULL; D* retVal = iter->second; return retVal; } /** Blocking read. @param key The key to read from. @return Pointer at key. */ D* read(const K &key) const { ScopedLock lock(mLock); typename Map::const_iterator iter = mMap.find(key); while (iter==mMap.end()) { mWriteSignal.wait(mLock); iter = mMap.find(key); } D* retVal = iter->second; return retVal; } }; /** This class is used to provide pointer-based comparison in priority_queues. */ template class PointerCompare { public: /** Compare the objects pointed to, not the pointers themselves. */ bool operator()(const T *v1, const T *v2) { return (*v1)>(*v2); } }; /** Priority queue for interthread operations. Passes pointers to objects. */ template , class Cmp = PointerCompare > class InterthreadPriorityQueue { protected: std::priority_queue mQ; mutable Mutex mLock; mutable Signal mWriteSignal; public: /** Clear the FIFO. */ void clear() { ScopedLock lock(mLock); while (mQ.size()>0) { T* ptr = mQ.top(); mQ.pop(); delete ptr; } } ~InterthreadPriorityQueue() { clear(); } size_t size() const { ScopedLock lock(mLock); return mQ.size(); } /** Non-blocking read. */ T* readNoBlock() { ScopedLock lock(mLock); T* retVal = NULL; if (mQ.size()!=0) { retVal = mQ.top(); mQ.pop(); } return retVal; } /** Blocking read. */ T* read() { ScopedLock lock(mLock); T* retVal; while (mQ.size()==0) mWriteSignal.wait(mLock); retVal = mQ.top(); mQ.pop(); return retVal; } /** Non-blocking write. */ void write(T* val) { // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. ScopedLock lock(mLock); mQ.push(val); mWriteSignal.signal(); } }; class Semaphore { private: bool mFlag; Signal mSignal; mutable Mutex mLock; public: Semaphore() :mFlag(false) { } void post() { ScopedLock lock(mLock); mFlag=true; mSignal.signal(); } void get() { ScopedLock lock(mLock); while (!mFlag) mSignal.wait(mLock); mFlag=false; } bool semtry() { ScopedLock lock(mLock); bool retVal = mFlag; mFlag = false; return retVal; } }; //@} #endif // vim: ts=4 sw=4