aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Chemeris <Alexander.Chemeris@gmail.com>2013-07-14 12:38:25 +0400
committerAlexander Chemeris <Alexander.Chemeris@gmail.com>2013-07-14 23:11:54 +0400
commite279124660a2add686e6c9266d18a9bb0cf43258 (patch)
treeaf047fe4e3cec8a8f7bb3e38556b9722217eb150
parent302a9198df85e4f771368f24d8445dec7c8b2203 (diff)
CommonLibs: Rewrite Threads class to be predictable during start/stop.
Current implementation strictly synchronize thread startup and shutdown to make sure a thread is actually started on startThread() exit and that it's stopped on stopThread() exit (or an error is returned). I also changed the Thread class to be used as a parent class for an actual thread. This way we could provide much better logical separation between variables used by different threads. Which will (hopefully) reduce number of issues with inter-thread communications.
-rw-r--r--CommonLibs/Threads.cpp244
-rw-r--r--CommonLibs/Threads.h131
2 files changed, 329 insertions, 46 deletions
diff --git a/CommonLibs/Threads.cpp b/CommonLibs/Threads.cpp
index dce7eda..f94bb7d 100644
--- a/CommonLibs/Threads.cpp
+++ b/CommonLibs/Threads.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2008 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
*
* This software is distributed under the terms of the GNU Affero Public License.
@@ -29,11 +30,26 @@
#include "Threads.h"
#include "Timeval.h"
+#include "Logger.h"
+#include <pthread.h>
+#include <sys/types.h>
+#include <errno.h> // for ETIMEDOUT
+#include <sys/syscall.h> // for SYS_gettid
+#include <sys/prctl.h> // Linux specific, for prctl(PR_SET_NAME)
+
+// Make sure we get MCL_CURRENT and MCL_FUTURE (for mlockall) on OS X 10.3
+#define _P1003_1B_VISIBLE
+#include <sys/mman.h>
+#undef _P1003_1B_VISIBLE
using namespace std;
+#define POSIX_OK 0
+#define POSIX_NO_WAIT 0
+#define POSIX_WAIT_FOREVER (-1)
+static inline int gettid() {return syscall(SYS_gettid);}
Mutex gStreamLock; ///< Global lock to control access to cout and cerr.
@@ -102,20 +118,228 @@ int Signal::wait(Mutex& wMutex, unsigned timeout) const
return pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime);
}
+Thread::Thread(const string &name, size_t stackSize)
+: mThreadId((pthread_t)0)
+, mThreadName(name)
+, mStackSize(stackSize)
+, mThreadState(THREAD_STATE_IDLE)
+, mThreadData(NULL)
+{
+}
-void Thread::start(void *(*task)(void*), void *arg)
+Thread::~Thread()
{
- assert(mThread==((pthread_t)0));
- bool res;
- // (pat) Moved initialization to constructor to avoid crash in destructor.
- //res = pthread_attr_init(&mAttrib);
- //assert(!res);
- res = pthread_attr_setstacksize(&mAttrib, mStackSize);
- assert(!res);
- res = pthread_create(&mThread, &mAttrib, task, arg);
- assert(!res);
+ stopThread();
}
+void *Thread::threadAdaptor(void *data)
+{
+ Thread *pThread = (Thread*)data;
+
+ // If we ever receive a thread cancel request, it means that the Thread
+ // object is in the process of being destroyed. To avoid the situation
+ // where a thread attempts to run after its containing Thread object has
+ // been freed, we set the thread up so that the cancel takes effect
+ // immediately (as opposed to waiting until the next thread cancellation
+ // point).
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+
+ // =====================================================================
+ // Synchronize with the start() in the parent thread.
+ {
+ // 1. Lock synchronization mutex.
+ ScopedLock lock(pThread->mThreadStartupMutex);
+
+ // 2. Btw, set the thread name, while we're inside the mutex.
+ // FIXME: This works on Linux with glibc >= 2.12. Under *BSD and MacOS X
+ // this function has different arguments.
+// pthread_setname_np(pThread->mThreadId, pThread->mThreadName.c_str());
+ // FIXME: For some reason the previous call doesn't work on my Ubuntu 12.04,
+ // so we use this one which works.
+ prctl(PR_SET_NAME, pThread->mThreadName.c_str());
+
+ // 3. Signal that we've started.
+ pThread->mThreadStartStopEvent.signal();
+
+ // 4. Wait until start() finishes its initialization.
+ //
+ // The actual thread is created and started with pthread_create(), then
+ // start() does its housekeeping and sets mThreadState=THREAD_STATE_RUNNING.
+ // If we allow Thread::run() to start before this initialization completes,
+ // callers might think (among other things) that the thread is not started
+ // while it's actually started.
+ pThread->mThreadInitializedEvent.wait(pThread->mThreadStartupMutex);
+ }
+ // Synchronization with the parent thread is finished.
+ // =====================================================================
+
+ // Log Thread ID for debugging purposes
+ LOG(INFO) << "Thread started: " << pThread->mThreadName
+ << " with lwp=" << gettid() << ", pid=" << getpid();
+
+ // Keep all memory locked into physical mem, to guarantee realtime-behaviour
+ int res = mlockall(MCL_CURRENT|MCL_FUTURE);
+ if (res != POSIX_OK) {
+ LOG(WARNING) << "Failed to lock memory for thread: " << pThread->mThreadName;
+ }
+
+ // Run the actual code
+ pThread->runThread();
+
+ // Huh, we're done. Signal to a (potentially) waiting stop()'s.
+ {
+ ScopedLock lock(pThread->mThreadStateMutex);
+ pThread->mThreadState = THREAD_STATE_IDLE;
+ pThread->mThreadStartStopEvent.broadcast();
+ }
+
+ return NULL;
+}
+
+Thread::ReturnStatus Thread::startThread(void *data)
+{
+ pthread_attr_t attrib;
+// timeval threadStartTime;
+// timespec threadStartTimeout;
+ bool res;
+
+ // Lock startup synchronization mutex. It will be used in conjunction with
+ // mThreadInitializedEvent and mThreadStartStopEvent conditional variables.
+ ScopedLock lock(mThreadStartupMutex);
+
+ {
+ ScopedLock lock(mThreadStateMutex);
+ if (mThreadState != THREAD_STATE_IDLE)
+ return ALREADY_STARTED;
+ mThreadState = THREAD_STATE_STARTING;
+ }
+
+ // Save thread data pointer
+ mThreadData = data;
+
+ LOG(DEBUG) << "Starting thread " << mThreadName << " (" << this << ")";
+
+ // construct thread attribute
+ res = pthread_attr_init(&attrib);
+ if (res != POSIX_OK) {
+ LOG(ALERT) << "pthread_attr_init failed, returned " << res
+ << " in " << mThreadName << " (" << this << ")";
+ }
+
+
+ // Set the thread stack size
+ res = pthread_attr_setstacksize(&attrib, mStackSize);
+ if (res != POSIX_OK)
+ {
+ LOG(ALERT) << "pthread_attr_setstacksize failed, returned " << res
+ << " in " << mThreadName << " (" << this << ")";
+ }
+
+ // Create the thread detached
+ res = pthread_attr_setdetachstate(&attrib, PTHREAD_CREATE_DETACHED);
+ if (res != POSIX_OK)
+ {
+ LOG(ALERT) << "pthread_attr_setdetachstate failed, returned " << res
+ << " in " << mThreadName << " (" << this << ")";
+ }
+
+ // =====================================================================
+ // Start the thread and synchronize with it
+
+ // Start the thread!
+ res = pthread_create(&mThreadId, &attrib, threadAdaptor, (void *)this);
+ // Attributes are no longer needed.
+ pthread_attr_destroy(&attrib);
+
+ if (res != POSIX_OK)
+ {
+ LOG(ALERT) << "pthread_create failed, returned " << res
+ << " in " << mThreadName << " (" << this << ")";
+
+ return PTHREAD_ERROR;
+ }
+
+ // Wait for the thread to startup.
+ res = mThreadStartStopEvent.wait(mThreadStartupMutex, THREAD_STARTUP_TIMEOUT*1000);
+
+ // If the thread does not start in THREAD_STARTUP_TIMEOUT seconds,
+ // then something is terribly wrong here.
+ if (res == ETIMEDOUT)
+ {
+ LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't started up in "
+ << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out.";
+
+ return RETURN_TIMEOUT;
+ }
+
+ // We're done with the initialization.
+ ackThreadStart();
+
+ // ToDo: Add other initialization here, e.g. adding this thread to a list of all threads.
+
+ // Startup initialization finished. Signal this to started thread, so
+ // it could go on.
+ mThreadInitializedEvent.signal();
+
+ return RETURN_OK;
+}
+Thread::ReturnStatus Thread::stopThread()
+{
+ int res;
+
+ LOG(DEBUG) << "Stopping thread " << mThreadName << " (" << this << ")";
+
+ while (1) {
+ ScopedLock lock(mThreadStateMutex);
+
+ switch (mThreadState) {
+ case THREAD_STATE_IDLE:
+ // Nothing to do.
+ return RETURN_OK;
+
+ case THREAD_STATE_STARTING:
+ // Something is wrong in thi world.
+ assert(mThreadState != THREAD_STATE_STARTING);
+ LOG(ALERT) << "Trying to stop thread " << mThreadName
+ << " (" << this << ") while it's trying to start.";
+ return WRONG_STATE;
+
+ case THREAD_STATE_RUNNING:
+ // Request shudown
+ mThreadState = THREAD_STATE_STOPPING;
+ // no "break" here to fall through to the next case
+
+ case THREAD_STATE_STOPPING:
+ // Wait for the thread to stop.
+ LOG(DEBUG) << "Waiting for thread " << mThreadName << " (" << this << ") to stop.";
+ res = mThreadStartStopEvent.wait(mThreadStateMutex, THREAD_STOP_TIMEOUT*1000);
+ LOG(DEBUG) << "Thread " << mThreadName << " (" << this << ") signalled stop "
+ << "with res=" << res << " and mThreadState=" << mThreadState;
+
+ // If the thread does not stop in THREAD_STOP_TIMEOUT seconds,
+ // return error. It may be waiting for something.
+ if (res == ETIMEDOUT)
+ {
+ LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't stopped in "
+ << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out.";
+
+ return RETURN_TIMEOUT;
+ }
+
+ // Conditional variable could return in case of a signal, so we should
+ // double check that the thread has indeed stopped.
+ if (mThreadState == THREAD_STATE_IDLE)
+ return RETURN_OK;
+ else
+ // Try again...
+ break;
+ }
+ }
+
+ // We should never reach this line
+ assert(false);
+ return RETURN_OK;
+}
// vim: ts=4 sw=4
diff --git a/CommonLibs/Threads.h b/CommonLibs/Threads.h
index 0d21e41..8006516 100644
--- a/CommonLibs/Threads.h
+++ b/CommonLibs/Threads.h
@@ -1,5 +1,6 @@
/*
* Copyright 2008, 2011 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This software is distributed under the terms of the GNU Affero Public License.
* See the COPYING file in the main directory for details.
@@ -137,47 +138,105 @@ class Signal {
};
-
-#define START_THREAD(thread,function,argument) \
- thread.start((void *(*)(void*))function, (void*)argument);
-
/** A C++ wrapper for pthread threads. */
class Thread {
- private:
-
- pthread_t mThread;
- pthread_attr_t mAttrib;
- // FIXME -- Can this be reduced now?
- size_t mStackSize;
-
-
- public:
-
- /** Create a thread in a non-running state. */
- Thread(size_t wStackSize = (65536*4)):mThread((pthread_t)0) {
- pthread_attr_init(&mAttrib); // (pat) moved this here.
- mStackSize=wStackSize;
- }
-
- /**
- Destroy the Thread.
- It should be stopped and joined.
- */
- // (pat) If the Thread is destroyed without being started, then mAttrib is undefined. Oops.
- ~Thread() { pthread_attr_destroy(&mAttrib); }
-
-
- /** Start the thread on a task. */
- void start(void *(*task)(void*), void *arg);
-
- /** Join a thread that will stop on its own. */
- void join() { int s = pthread_join(mThread,NULL); assert(!s); mThread = 0; }
-
+public:
+
+ typedef void *(*Adaptor)(void*);
+ enum ReturnStatus {
+ RETURN_OK = 0,
+ ALREADY_STARTED,
+ ALREADY_IDLE,
+ PTHREAD_ERROR,
+ WRONG_STATE,
+ RETURN_TIMEOUT
+ };
+ enum ThreadState {
+ THREAD_STATE_IDLE, ///< Thread is not started. On start() => STARTING
+ THREAD_STATE_STARTING, ///< Thread is about to start. When actually started => RUNNING
+ THREAD_STATE_RUNNING, ///< Thread is active. On stop() => STOPPING
+ THREAD_STATE_STOPPING ///< Thread is about to stop. When actually stopped => IDLE
+ };
+ enum {
+ THREAD_STARTUP_TIMEOUT=5, ///< Time to wait for thread startup (in seconds).
+ THREAD_STOP_TIMEOUT=5 ///< Time to wait for thread stop (in seconds).
+ };
+
+ /** Create a thread in a non-running state. */
+ Thread(const std::string &name, size_t stackSize = (65536*4));
+
+ /** Destroy the Thread. */
+ virtual ~Thread();
+
+ /** Start the thread. */
+ ReturnStatus startThread(void *data=NULL);
+
+ /** Stop the thread. */
+ ReturnStatus stopThread();
+
+ ThreadState getThreadState() const
+ {
+ ScopedLock lock(mThreadStateMutex);
+ return mThreadState;
+ }
+
+ bool isThreadRunning() const
+ {
+ ScopedLock lock(mThreadStateMutex);
+ return mThreadState == THREAD_STATE_RUNNING;
+ }
+ void requestThreadStop()
+ {
+ ScopedLock lock(mThreadStateMutex);
+ if (mThreadState == THREAD_STATE_RUNNING)
+ mThreadState = THREAD_STATE_STOPPING;
+ }
+ bool isThreadStopping() const
+ {
+ ScopedLock lock(mThreadStateMutex);
+ return mThreadState == THREAD_STATE_STOPPING;
+ }
+
+ const std::string &getThreadName() const {return mThreadName;}
+
+protected:
+
+ pthread_t mThreadId; ///< OS id of the thread.
+ const std::string mThreadName; ///< Name of the thread.
+ size_t mStackSize; ///< Requested stack size for the thread.
+ ThreadState mThreadState; ///< The current state of the thread.
+ mutable Mutex mThreadStateMutex; ///< Mutex to protect ThreadState variable
+ void *mThreadData; ///< Data to be passed to the thread loop.
+ Mutex mThreadStartupMutex; ///< Mutex, used with the next two conditional
+ ///< variables to synchronize thread startup.
+ Signal mThreadInitializedEvent; ///< Conditional variable, signaling
+ ///< that this thread object initialization is completed
+ ///< and the thread could go on.
+ Signal mThreadStartStopEvent; ///< Conditional variable, signaling
+ ///< that the thread is started and start() method could
+ ///< return to caller.
+
+ /** Function with the actual thread loop.
+ * Override this function in child classes to do real work.
+ */
+ virtual void runThread() =0;
+
+ // Static funciton which actually starts the run() method.
+ static void *threadAdaptor(void *data);
+
+ void ackThreadStart() {
+ ScopedLock lock(mThreadStateMutex);
+ assert(mThreadState == THREAD_STATE_STARTING);
+ mThreadState = THREAD_STATE_RUNNING;
+ }
+ void ackThreadStop() {
+ ScopedLock lock(mThreadStateMutex);
+ assert(mThreadState == THREAD_STATE_STOPPING);
+ mThreadState = THREAD_STATE_IDLE;
+ }
};
-
-
#endif
// vim: ts=4 sw=4