aboutsummaryrefslogtreecommitdiffstats
path: root/CommonLibs/Interthread.h
diff options
context:
space:
mode:
authordburgess <dburgess@19bc5d8c-e614-43d4-8b26-e1612bc8e597>2011-10-07 02:40:51 +0000
committerdburgess <dburgess@19bc5d8c-e614-43d4-8b26-e1612bc8e597>2011-10-07 02:40:51 +0000
commit82c46ff7aeb793d70f11fcaf22b2813b7dc0f78b (patch)
tree25cf269a01eaf3ae2ca0cece6f1a22ca61308e6d /CommonLibs/Interthread.h
parentcd8381743f1fe458f10e96020ad4626f4ae3d7bd (diff)
Putting the actual OpenBTS P2.8 source code into the public SVN branch.
git-svn-id: http://wush.net/svn/range/software/public/openbts/trunk@2242 19bc5d8c-e614-43d4-8b26-e1612bc8e597
Diffstat (limited to 'CommonLibs/Interthread.h')
-rw-r--r--CommonLibs/Interthread.h546
1 files changed, 546 insertions, 0 deletions
diff --git a/CommonLibs/Interthread.h b/CommonLibs/Interthread.h
new file mode 100644
index 0000000..023ac14
--- /dev/null
+++ b/CommonLibs/Interthread.h
@@ -0,0 +1,546 @@
+/*
+* Copyright 2008, 2011 Free Software Foundation, Inc.
+*
+* This software is distributed under the terms of the GNU Affero Public License.
+* See the COPYING file in the main directory for details.
+*
+* This use of this software may be subject to additional restrictions.
+* See the LEGAL file in the main directory for details.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+
+#ifndef INTERTHREAD_H
+#define INTERTHREAD_H
+
+#include "Timeval.h"
+#include "Threads.h"
+#include "LinkedLists.h"
+#include <map>
+#include <vector>
+#include <queue>
+
+
+
+
+
+/**@defgroup Templates for interthread mechanisms. */
+//@{
+
+
+/** Pointer FIFO for interthread operations. */
+template <class T> class InterthreadQueue {
+
+ protected:
+
+ PointerFIFO mQ;
+ mutable Mutex mLock;
+ mutable Signal mWriteSignal;
+
+
+ public:
+
+ /** Delete contents. */
+ void clear()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) delete (T*)mQ.get();
+ }
+
+ /** Empty the queue, but don't delete. */
+ void flushNoDelete()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) mQ.get();
+ }
+
+
+ ~InterthreadQueue()
+ { clear(); }
+
+
+ size_t size() const
+ {
+ ScopedLock lock(mLock);
+ return mQ.size();
+ }
+
+ /**
+ Blocking read.
+ @return Pointer to object (will not be NULL).
+ */
+ T* read()
+ {
+ ScopedLock lock(mLock);
+ T* retVal = (T*)mQ.get();
+ while (retVal==NULL) {
+ mWriteSignal.wait(mLock);
+ retVal = (T*)mQ.get();
+ }
+ return retVal;
+ }
+
+ /**
+ Blocking read with a timeout.
+ @param timeout The read timeout in ms.
+ @return Pointer to object or NULL on timeout.
+ */
+ T* read(unsigned timeout)
+ {
+ if (timeout==0) return readNoBlock();
+ Timeval waitTime(timeout);
+ ScopedLock lock(mLock);
+ while ((mQ.size()==0) && (!waitTime.passed()))
+ mWriteSignal.wait(mLock,waitTime.remaining());
+ T* retVal = (T*)mQ.get();
+ return retVal;
+ }
+
+ /**
+ Non-blocking read.
+ @return Pointer to object or NULL if FIFO is empty.
+ */
+ T* readNoBlock()
+ {
+ ScopedLock lock(mLock);
+ return (T*)mQ.get();
+ }
+
+ /** Non-blocking write. */
+ void write(T* val)
+ {
+ ScopedLock lock(mLock);
+ mQ.put(val);
+ mWriteSignal.signal();
+ }
+
+
+};
+
+
+
+/** Pointer FIFO for interthread operations. */
+template <class T> class InterthreadQueueWithWait {
+
+ protected:
+
+ PointerFIFO mQ;
+ mutable Mutex mLock;
+ mutable Signal mWriteSignal;
+ mutable Signal mReadSignal;
+
+ virtual void freeElement(T* element) const { delete element; };
+
+ public:
+
+ /** Delete contents. */
+ void clear()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) freeElement((T*)mQ.get());
+ mReadSignal.signal();
+ }
+
+
+
+ virtual ~InterthreadQueueWithWait()
+ { clear(); }
+
+
+ size_t size() const
+ {
+ ScopedLock lock(mLock);
+ return mQ.size();
+ }
+
+ /**
+ Blocking read.
+ @return Pointer to object (will not be NULL).
+ */
+ T* read()
+ {
+ ScopedLock lock(mLock);
+ T* retVal = (T*)mQ.get();
+ while (retVal==NULL) {
+ mWriteSignal.wait(mLock);
+ retVal = (T*)mQ.get();
+ }
+ mReadSignal.signal();
+ return retVal;
+ }
+
+ /**
+ Blocking read with a timeout.
+ @param timeout The read timeout in ms.
+ @return Pointer to object or NULL on timeout.
+ */
+ T* read(unsigned timeout)
+ {
+ if (timeout==0) return readNoBlock();
+ Timeval waitTime(timeout);
+ ScopedLock lock(mLock);
+ while ((mQ.size()==0) && (!waitTime.passed()))
+ mWriteSignal.wait(mLock,waitTime.remaining());
+ T* retVal = (T*)mQ.get();
+ if (retVal!=NULL) mReadSignal.signal();
+ return retVal;
+ }
+
+ /**
+ Non-blocking read.
+ @return Pointer to object or NULL if FIFO is empty.
+ */
+ T* readNoBlock()
+ {
+ ScopedLock lock(mLock);
+ T* retVal = (T*)mQ.get();
+ if (retVal!=NULL) mReadSignal.signal();
+ return retVal;
+ }
+
+ /** Non-blocking write. */
+ void write(T* val)
+ {
+ ScopedLock lock(mLock);
+ mQ.put(val);
+ mWriteSignal.signal();
+ }
+
+ /** Wait until the queue falls below a low water mark. */
+ void wait(size_t sz=0)
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>sz) mReadSignal.wait(mLock);
+ }
+
+};
+
+
+
+
+
+/** Thread-safe map of pointers to class D, keyed by class K. */
+template <class K, class D > class InterthreadMap {
+
+protected:
+
+ typedef std::map<K,D*> Map;
+ Map mMap;
+ mutable Mutex mLock;
+ Signal mWriteSignal;
+
+public:
+
+ void clear()
+ {
+ // Delete everything in the map.
+ ScopedLock lock(mLock);
+ typename Map::iterator iter = mMap.begin();
+ while (iter != mMap.end()) {
+ delete iter->second;
+ ++iter;
+ }
+ mMap.clear();
+ }
+
+ ~InterthreadMap() { clear(); }
+
+ /**
+ Non-blocking write.
+ @param key The index to write to.
+ @param wData Pointer to data, not to be deleted until removed from the map.
+ */
+ void write(const K &key, D * wData)
+ {
+ ScopedLock lock(mLock);
+ typename Map::iterator iter = mMap.find(key);
+ if (iter!=mMap.end()) {
+ delete iter->second;
+ iter->second = wData;
+ } else {
+ mMap[key] = wData;
+ }
+ mWriteSignal.broadcast();
+ }
+
+ /**
+ Non-blocking read with element removal.
+ @param key Key to read from.
+ @return Pointer at key or NULL if key not found, to be deleted by caller.
+ */
+ D* getNoBlock(const K& key)
+ {
+ ScopedLock lock(mLock);
+ typename Map::iterator iter = mMap.find(key);
+ if (iter==mMap.end()) return NULL;
+ D* retVal = iter->second;
+ mMap.erase(iter);
+ return retVal;
+ }
+
+ /**
+ Blocking read with a timeout and element removal.
+ @param key The key to read from.
+ @param timeout The blocking timeout in ms.
+ @return Pointer at key or NULL on timeout, to be deleted by caller.
+ */
+ D* get(const K &key, unsigned timeout)
+ {
+ if (timeout==0) return getNoBlock(key);
+ Timeval waitTime(timeout);
+ ScopedLock lock(mLock);
+ typename Map::iterator iter = mMap.find(key);
+ while ((iter==mMap.end()) && (!waitTime.passed())) {
+ mWriteSignal.wait(mLock,waitTime.remaining());
+ iter = mMap.find(key);
+ }
+ if (iter==mMap.end()) return NULL;
+ D* retVal = iter->second;
+ mMap.erase(iter);
+ return retVal;
+ }
+
+ /**
+ Blocking read with and element removal.
+ @param key The key to read from.
+ @return Pointer at key, to be deleted by caller.
+ */
+ D* get(const K &key)
+ {
+ ScopedLock lock(mLock);
+ typename Map::iterator iter = mMap.find(key);
+ while (iter==mMap.end()) {
+ mWriteSignal.wait(mLock);
+ iter = mMap.find(key);
+ }
+ D* retVal = iter->second;
+ mMap.erase(iter);
+ return retVal;
+ }
+
+
+ /**
+ Remove an entry and delete it.
+ @param key The key of the entry to delete.
+ @return True if it was actually found and deleted.
+ */
+ bool remove(const K &key )
+ {
+ D* val = getNoBlock(key);
+ if (!val) return false;
+ delete val;
+ return true;
+ }
+
+
+ /**
+ Non-blocking read.
+ @param key Key to read from.
+ @return Pointer at key or NULL if key not found.
+ */
+ D* readNoBlock(const K& key) const
+ {
+ D* retVal=NULL;
+ ScopedLock lock(mLock);
+ typename Map::const_iterator iter = mMap.find(key);
+ if (iter!=mMap.end()) retVal = iter->second;
+ return retVal;
+ }
+
+ /**
+ Blocking read with a timeout.
+ @param key The key to read from.
+ @param timeout The blocking timeout in ms.
+ @return Pointer at key or NULL on timeout.
+ */
+ D* read(const K &key, unsigned timeout) const
+ {
+ if (timeout==0) return readNoBlock(key);
+ ScopedLock lock(mLock);
+ Timeval waitTime(timeout);
+ typename Map::const_iterator iter = mMap.find(key);
+ while ((iter==mMap.end()) && (!waitTime.passed())) {
+ mWriteSignal.wait(mLock,waitTime.remaining());
+ iter = mMap.find(key);
+ }
+ if (iter==mMap.end()) return NULL;
+ D* retVal = iter->second;
+ return retVal;
+ }
+
+ /**
+ Blocking read.
+ @param key The key to read from.
+ @return Pointer at key.
+ */
+ D* read(const K &key) const
+ {
+ ScopedLock lock(mLock);
+ typename Map::const_iterator iter = mMap.find(key);
+ while (iter==mMap.end()) {
+ mWriteSignal.wait(mLock);
+ iter = mMap.find(key);
+ }
+ D* retVal = iter->second;
+ return retVal;
+ }
+
+};
+
+
+
+
+
+
+
+/** This class is used to provide pointer-based comparison in priority_queues. */
+template <class T> class PointerCompare {
+
+ public:
+
+ /** Compare the objects pointed to, not the pointers themselves. */
+ bool operator()(const T *v1, const T *v2)
+ { return (*v1)>(*v2); }
+
+};
+
+
+
+/**
+ Priority queue for interthread operations.
+ Passes pointers to objects.
+*/
+template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {
+
+ protected:
+
+ std::priority_queue<T*,C,Cmp> mQ;
+ mutable Mutex mLock;
+ mutable Signal mWriteSignal;
+
+ public:
+
+
+ /** Clear the FIFO. */
+ void clear()
+ {
+ ScopedLock lock(mLock);
+ while (mQ.size()>0) {
+ T* ptr = mQ.top();
+ mQ.pop();
+ delete ptr;
+ }
+ }
+
+
+ ~InterthreadPriorityQueue()
+ {
+ clear();
+ }
+
+ size_t size() const
+ {
+ ScopedLock lock(mLock);
+ return mQ.size();
+ }
+
+
+ /** Non-blocking read. */
+ T* readNoBlock()
+ {
+ ScopedLock lock(mLock);
+ T* retVal = NULL;
+ if (mQ.size()!=0) {
+ retVal = mQ.top();
+ mQ.pop();
+ }
+ return retVal;
+ }
+
+ /** Blocking read. */
+ T* read()
+ {
+ ScopedLock lock(mLock);
+ T* retVal;
+ while (mQ.size()==0) mWriteSignal.wait(mLock);
+ retVal = mQ.top();
+ mQ.pop();
+ return retVal;
+ }
+
+ /** Non-blocking write. */
+ void write(T* val)
+ {
+ ScopedLock lock(mLock);
+ mQ.push(val);
+ mWriteSignal.signal();
+ }
+
+};
+
+
+
+
+
+class Semaphore {
+
+ private:
+
+ bool mFlag;
+ Signal mSignal;
+ mutable Mutex mLock;
+
+ public:
+
+ Semaphore()
+ :mFlag(false)
+ { }
+
+ void post()
+ {
+ ScopedLock lock(mLock);
+ mFlag=true;
+ mSignal.signal();
+ }
+
+ void get()
+ {
+ ScopedLock lock(mLock);
+ while (!mFlag) mSignal.wait(mLock);
+ mFlag=false;
+ }
+
+ bool semtry()
+ {
+ ScopedLock lock(mLock);
+ bool retVal = mFlag;
+ mFlag = false;
+ return retVal;
+ }
+
+};
+
+
+
+
+
+//@}
+
+
+
+
+#endif
+// vim: ts=4 sw=4