diff options
author | Alexander Chemeris <Alexander.Chemeris@gmail.com> | 2013-07-14 12:38:25 +0400 |
---|---|---|
committer | Alexander Chemeris <Alexander.Chemeris@gmail.com> | 2013-07-14 23:11:54 +0400 |
commit | e279124660a2add686e6c9266d18a9bb0cf43258 (patch) | |
tree | af047fe4e3cec8a8f7bb3e38556b9722217eb150 | |
parent | 302a9198df85e4f771368f24d8445dec7c8b2203 (diff) |
CommonLibs: Rewrite Threads class to be predictable during start/stop.
Current implementation strictly synchronize thread startup and shutdown
to make sure a thread is actually started on startThread() exit and
that it's stopped on stopThread() exit (or an error is returned).
I also changed the Thread class to be used as a parent class for
an actual thread. This way we could provide much better logical
separation between variables used by different threads. Which will
(hopefully) reduce number of issues with inter-thread communications.
-rw-r--r-- | CommonLibs/Threads.cpp | 244 | ||||
-rw-r--r-- | CommonLibs/Threads.h | 131 |
2 files changed, 329 insertions, 46 deletions
diff --git a/CommonLibs/Threads.cpp b/CommonLibs/Threads.cpp index dce7eda..f94bb7d 100644 --- a/CommonLibs/Threads.cpp +++ b/CommonLibs/Threads.cpp @@ -1,5 +1,6 @@ /* * Copyright 2008 Free Software Foundation, Inc. +* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru> * * * This software is distributed under the terms of the GNU Affero Public License. @@ -29,11 +30,26 @@ #include "Threads.h" #include "Timeval.h" +#include "Logger.h" +#include <pthread.h> +#include <sys/types.h> +#include <errno.h> // for ETIMEDOUT +#include <sys/syscall.h> // for SYS_gettid +#include <sys/prctl.h> // Linux specific, for prctl(PR_SET_NAME) + +// Make sure we get MCL_CURRENT and MCL_FUTURE (for mlockall) on OS X 10.3 +#define _P1003_1B_VISIBLE +#include <sys/mman.h> +#undef _P1003_1B_VISIBLE using namespace std; +#define POSIX_OK 0 +#define POSIX_NO_WAIT 0 +#define POSIX_WAIT_FOREVER (-1) +static inline int gettid() {return syscall(SYS_gettid);} Mutex gStreamLock; ///< Global lock to control access to cout and cerr. @@ -102,20 +118,228 @@ int Signal::wait(Mutex& wMutex, unsigned timeout) const return pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime); } +Thread::Thread(const string &name, size_t stackSize) +: mThreadId((pthread_t)0) +, mThreadName(name) +, mStackSize(stackSize) +, mThreadState(THREAD_STATE_IDLE) +, mThreadData(NULL) +{ +} -void Thread::start(void *(*task)(void*), void *arg) +Thread::~Thread() { - assert(mThread==((pthread_t)0)); - bool res; - // (pat) Moved initialization to constructor to avoid crash in destructor. - //res = pthread_attr_init(&mAttrib); - //assert(!res); - res = pthread_attr_setstacksize(&mAttrib, mStackSize); - assert(!res); - res = pthread_create(&mThread, &mAttrib, task, arg); - assert(!res); + stopThread(); } +void *Thread::threadAdaptor(void *data) +{ + Thread *pThread = (Thread*)data; + + // If we ever receive a thread cancel request, it means that the Thread + // object is in the process of being destroyed. To avoid the situation + // where a thread attempts to run after its containing Thread object has + // been freed, we set the thread up so that the cancel takes effect + // immediately (as opposed to waiting until the next thread cancellation + // point). + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + // ===================================================================== + // Synchronize with the start() in the parent thread. + { + // 1. Lock synchronization mutex. + ScopedLock lock(pThread->mThreadStartupMutex); + + // 2. Btw, set the thread name, while we're inside the mutex. + // FIXME: This works on Linux with glibc >= 2.12. Under *BSD and MacOS X + // this function has different arguments. +// pthread_setname_np(pThread->mThreadId, pThread->mThreadName.c_str()); + // FIXME: For some reason the previous call doesn't work on my Ubuntu 12.04, + // so we use this one which works. + prctl(PR_SET_NAME, pThread->mThreadName.c_str()); + + // 3. Signal that we've started. + pThread->mThreadStartStopEvent.signal(); + + // 4. Wait until start() finishes its initialization. + // + // The actual thread is created and started with pthread_create(), then + // start() does its housekeeping and sets mThreadState=THREAD_STATE_RUNNING. + // If we allow Thread::run() to start before this initialization completes, + // callers might think (among other things) that the thread is not started + // while it's actually started. + pThread->mThreadInitializedEvent.wait(pThread->mThreadStartupMutex); + } + // Synchronization with the parent thread is finished. + // ===================================================================== + + // Log Thread ID for debugging purposes + LOG(INFO) << "Thread started: " << pThread->mThreadName + << " with lwp=" << gettid() << ", pid=" << getpid(); + + // Keep all memory locked into physical mem, to guarantee realtime-behaviour + int res = mlockall(MCL_CURRENT|MCL_FUTURE); + if (res != POSIX_OK) { + LOG(WARNING) << "Failed to lock memory for thread: " << pThread->mThreadName; + } + + // Run the actual code + pThread->runThread(); + + // Huh, we're done. Signal to a (potentially) waiting stop()'s. + { + ScopedLock lock(pThread->mThreadStateMutex); + pThread->mThreadState = THREAD_STATE_IDLE; + pThread->mThreadStartStopEvent.broadcast(); + } + + return NULL; +} + +Thread::ReturnStatus Thread::startThread(void *data) +{ + pthread_attr_t attrib; +// timeval threadStartTime; +// timespec threadStartTimeout; + bool res; + + // Lock startup synchronization mutex. It will be used in conjunction with + // mThreadInitializedEvent and mThreadStartStopEvent conditional variables. + ScopedLock lock(mThreadStartupMutex); + + { + ScopedLock lock(mThreadStateMutex); + if (mThreadState != THREAD_STATE_IDLE) + return ALREADY_STARTED; + mThreadState = THREAD_STATE_STARTING; + } + + // Save thread data pointer + mThreadData = data; + + LOG(DEBUG) << "Starting thread " << mThreadName << " (" << this << ")"; + + // construct thread attribute + res = pthread_attr_init(&attrib); + if (res != POSIX_OK) { + LOG(ALERT) << "pthread_attr_init failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + + // Set the thread stack size + res = pthread_attr_setstacksize(&attrib, mStackSize); + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_attr_setstacksize failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + // Create the thread detached + res = pthread_attr_setdetachstate(&attrib, PTHREAD_CREATE_DETACHED); + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_attr_setdetachstate failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + // ===================================================================== + // Start the thread and synchronize with it + + // Start the thread! + res = pthread_create(&mThreadId, &attrib, threadAdaptor, (void *)this); + // Attributes are no longer needed. + pthread_attr_destroy(&attrib); + + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_create failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + + return PTHREAD_ERROR; + } + + // Wait for the thread to startup. + res = mThreadStartStopEvent.wait(mThreadStartupMutex, THREAD_STARTUP_TIMEOUT*1000); + + // If the thread does not start in THREAD_STARTUP_TIMEOUT seconds, + // then something is terribly wrong here. + if (res == ETIMEDOUT) + { + LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't started up in " + << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out."; + + return RETURN_TIMEOUT; + } + + // We're done with the initialization. + ackThreadStart(); + + // ToDo: Add other initialization here, e.g. adding this thread to a list of all threads. + + // Startup initialization finished. Signal this to started thread, so + // it could go on. + mThreadInitializedEvent.signal(); + + return RETURN_OK; +} +Thread::ReturnStatus Thread::stopThread() +{ + int res; + + LOG(DEBUG) << "Stopping thread " << mThreadName << " (" << this << ")"; + + while (1) { + ScopedLock lock(mThreadStateMutex); + + switch (mThreadState) { + case THREAD_STATE_IDLE: + // Nothing to do. + return RETURN_OK; + + case THREAD_STATE_STARTING: + // Something is wrong in thi world. + assert(mThreadState != THREAD_STATE_STARTING); + LOG(ALERT) << "Trying to stop thread " << mThreadName + << " (" << this << ") while it's trying to start."; + return WRONG_STATE; + + case THREAD_STATE_RUNNING: + // Request shudown + mThreadState = THREAD_STATE_STOPPING; + // no "break" here to fall through to the next case + + case THREAD_STATE_STOPPING: + // Wait for the thread to stop. + LOG(DEBUG) << "Waiting for thread " << mThreadName << " (" << this << ") to stop."; + res = mThreadStartStopEvent.wait(mThreadStateMutex, THREAD_STOP_TIMEOUT*1000); + LOG(DEBUG) << "Thread " << mThreadName << " (" << this << ") signalled stop " + << "with res=" << res << " and mThreadState=" << mThreadState; + + // If the thread does not stop in THREAD_STOP_TIMEOUT seconds, + // return error. It may be waiting for something. + if (res == ETIMEDOUT) + { + LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't stopped in " + << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out."; + + return RETURN_TIMEOUT; + } + + // Conditional variable could return in case of a signal, so we should + // double check that the thread has indeed stopped. + if (mThreadState == THREAD_STATE_IDLE) + return RETURN_OK; + else + // Try again... + break; + } + } + + // We should never reach this line + assert(false); + return RETURN_OK; +} // vim: ts=4 sw=4 diff --git a/CommonLibs/Threads.h b/CommonLibs/Threads.h index 0d21e41..8006516 100644 --- a/CommonLibs/Threads.h +++ b/CommonLibs/Threads.h @@ -1,5 +1,6 @@ /* * Copyright 2008, 2011 Free Software Foundation, Inc. +* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru> * * This software is distributed under the terms of the GNU Affero Public License. * See the COPYING file in the main directory for details. @@ -137,47 +138,105 @@ class Signal { }; - -#define START_THREAD(thread,function,argument) \ - thread.start((void *(*)(void*))function, (void*)argument); - /** A C++ wrapper for pthread threads. */ class Thread { - private: - - pthread_t mThread; - pthread_attr_t mAttrib; - // FIXME -- Can this be reduced now? - size_t mStackSize; - - - public: - - /** Create a thread in a non-running state. */ - Thread(size_t wStackSize = (65536*4)):mThread((pthread_t)0) { - pthread_attr_init(&mAttrib); // (pat) moved this here. - mStackSize=wStackSize; - } - - /** - Destroy the Thread. - It should be stopped and joined. - */ - // (pat) If the Thread is destroyed without being started, then mAttrib is undefined. Oops. - ~Thread() { pthread_attr_destroy(&mAttrib); } - - - /** Start the thread on a task. */ - void start(void *(*task)(void*), void *arg); - - /** Join a thread that will stop on its own. */ - void join() { int s = pthread_join(mThread,NULL); assert(!s); mThread = 0; } - +public: + + typedef void *(*Adaptor)(void*); + enum ReturnStatus { + RETURN_OK = 0, + ALREADY_STARTED, + ALREADY_IDLE, + PTHREAD_ERROR, + WRONG_STATE, + RETURN_TIMEOUT + }; + enum ThreadState { + THREAD_STATE_IDLE, ///< Thread is not started. On start() => STARTING + THREAD_STATE_STARTING, ///< Thread is about to start. When actually started => RUNNING + THREAD_STATE_RUNNING, ///< Thread is active. On stop() => STOPPING + THREAD_STATE_STOPPING ///< Thread is about to stop. When actually stopped => IDLE + }; + enum { + THREAD_STARTUP_TIMEOUT=5, ///< Time to wait for thread startup (in seconds). + THREAD_STOP_TIMEOUT=5 ///< Time to wait for thread stop (in seconds). + }; + + /** Create a thread in a non-running state. */ + Thread(const std::string &name, size_t stackSize = (65536*4)); + + /** Destroy the Thread. */ + virtual ~Thread(); + + /** Start the thread. */ + ReturnStatus startThread(void *data=NULL); + + /** Stop the thread. */ + ReturnStatus stopThread(); + + ThreadState getThreadState() const + { + ScopedLock lock(mThreadStateMutex); + return mThreadState; + } + + bool isThreadRunning() const + { + ScopedLock lock(mThreadStateMutex); + return mThreadState == THREAD_STATE_RUNNING; + } + void requestThreadStop() + { + ScopedLock lock(mThreadStateMutex); + if (mThreadState == THREAD_STATE_RUNNING) + mThreadState = THREAD_STATE_STOPPING; + } + bool isThreadStopping() const + { + ScopedLock lock(mThreadStateMutex); + return mThreadState == THREAD_STATE_STOPPING; + } + + const std::string &getThreadName() const {return mThreadName;} + +protected: + + pthread_t mThreadId; ///< OS id of the thread. + const std::string mThreadName; ///< Name of the thread. + size_t mStackSize; ///< Requested stack size for the thread. + ThreadState mThreadState; ///< The current state of the thread. + mutable Mutex mThreadStateMutex; ///< Mutex to protect ThreadState variable + void *mThreadData; ///< Data to be passed to the thread loop. + Mutex mThreadStartupMutex; ///< Mutex, used with the next two conditional + ///< variables to synchronize thread startup. + Signal mThreadInitializedEvent; ///< Conditional variable, signaling + ///< that this thread object initialization is completed + ///< and the thread could go on. + Signal mThreadStartStopEvent; ///< Conditional variable, signaling + ///< that the thread is started and start() method could + ///< return to caller. + + /** Function with the actual thread loop. + * Override this function in child classes to do real work. + */ + virtual void runThread() =0; + + // Static funciton which actually starts the run() method. + static void *threadAdaptor(void *data); + + void ackThreadStart() { + ScopedLock lock(mThreadStateMutex); + assert(mThreadState == THREAD_STATE_STARTING); + mThreadState = THREAD_STATE_RUNNING; + } + void ackThreadStop() { + ScopedLock lock(mThreadStateMutex); + assert(mThreadState == THREAD_STATE_STOPPING); + mThreadState = THREAD_STATE_IDLE; + } }; - - #endif // vim: ts=4 sw=4 |