diff options
Diffstat (limited to 'CommonLibs/Threads.cpp')
-rw-r--r-- | CommonLibs/Threads.cpp | 244 |
1 files changed, 234 insertions, 10 deletions
diff --git a/CommonLibs/Threads.cpp b/CommonLibs/Threads.cpp index dce7eda..f94bb7d 100644 --- a/CommonLibs/Threads.cpp +++ b/CommonLibs/Threads.cpp @@ -1,5 +1,6 @@ /* * Copyright 2008 Free Software Foundation, Inc. +* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru> * * * This software is distributed under the terms of the GNU Affero Public License. @@ -29,11 +30,26 @@ #include "Threads.h" #include "Timeval.h" +#include "Logger.h" +#include <pthread.h> +#include <sys/types.h> +#include <errno.h> // for ETIMEDOUT +#include <sys/syscall.h> // for SYS_gettid +#include <sys/prctl.h> // Linux specific, for prctl(PR_SET_NAME) + +// Make sure we get MCL_CURRENT and MCL_FUTURE (for mlockall) on OS X 10.3 +#define _P1003_1B_VISIBLE +#include <sys/mman.h> +#undef _P1003_1B_VISIBLE using namespace std; +#define POSIX_OK 0 +#define POSIX_NO_WAIT 0 +#define POSIX_WAIT_FOREVER (-1) +static inline int gettid() {return syscall(SYS_gettid);} Mutex gStreamLock; ///< Global lock to control access to cout and cerr. @@ -102,20 +118,228 @@ int Signal::wait(Mutex& wMutex, unsigned timeout) const return pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime); } +Thread::Thread(const string &name, size_t stackSize) +: mThreadId((pthread_t)0) +, mThreadName(name) +, mStackSize(stackSize) +, mThreadState(THREAD_STATE_IDLE) +, mThreadData(NULL) +{ +} -void Thread::start(void *(*task)(void*), void *arg) +Thread::~Thread() { - assert(mThread==((pthread_t)0)); - bool res; - // (pat) Moved initialization to constructor to avoid crash in destructor. - //res = pthread_attr_init(&mAttrib); - //assert(!res); - res = pthread_attr_setstacksize(&mAttrib, mStackSize); - assert(!res); - res = pthread_create(&mThread, &mAttrib, task, arg); - assert(!res); + stopThread(); } +void *Thread::threadAdaptor(void *data) +{ + Thread *pThread = (Thread*)data; + + // If we ever receive a thread cancel request, it means that the Thread + // object is in the process of being destroyed. To avoid the situation + // where a thread attempts to run after its containing Thread object has + // been freed, we set the thread up so that the cancel takes effect + // immediately (as opposed to waiting until the next thread cancellation + // point). + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + // ===================================================================== + // Synchronize with the start() in the parent thread. + { + // 1. Lock synchronization mutex. + ScopedLock lock(pThread->mThreadStartupMutex); + + // 2. Btw, set the thread name, while we're inside the mutex. + // FIXME: This works on Linux with glibc >= 2.12. Under *BSD and MacOS X + // this function has different arguments. +// pthread_setname_np(pThread->mThreadId, pThread->mThreadName.c_str()); + // FIXME: For some reason the previous call doesn't work on my Ubuntu 12.04, + // so we use this one which works. + prctl(PR_SET_NAME, pThread->mThreadName.c_str()); + + // 3. Signal that we've started. + pThread->mThreadStartStopEvent.signal(); + + // 4. Wait until start() finishes its initialization. + // + // The actual thread is created and started with pthread_create(), then + // start() does its housekeeping and sets mThreadState=THREAD_STATE_RUNNING. + // If we allow Thread::run() to start before this initialization completes, + // callers might think (among other things) that the thread is not started + // while it's actually started. + pThread->mThreadInitializedEvent.wait(pThread->mThreadStartupMutex); + } + // Synchronization with the parent thread is finished. + // ===================================================================== + + // Log Thread ID for debugging purposes + LOG(INFO) << "Thread started: " << pThread->mThreadName + << " with lwp=" << gettid() << ", pid=" << getpid(); + + // Keep all memory locked into physical mem, to guarantee realtime-behaviour + int res = mlockall(MCL_CURRENT|MCL_FUTURE); + if (res != POSIX_OK) { + LOG(WARNING) << "Failed to lock memory for thread: " << pThread->mThreadName; + } + + // Run the actual code + pThread->runThread(); + + // Huh, we're done. Signal to a (potentially) waiting stop()'s. + { + ScopedLock lock(pThread->mThreadStateMutex); + pThread->mThreadState = THREAD_STATE_IDLE; + pThread->mThreadStartStopEvent.broadcast(); + } + + return NULL; +} + +Thread::ReturnStatus Thread::startThread(void *data) +{ + pthread_attr_t attrib; +// timeval threadStartTime; +// timespec threadStartTimeout; + bool res; + + // Lock startup synchronization mutex. It will be used in conjunction with + // mThreadInitializedEvent and mThreadStartStopEvent conditional variables. + ScopedLock lock(mThreadStartupMutex); + + { + ScopedLock lock(mThreadStateMutex); + if (mThreadState != THREAD_STATE_IDLE) + return ALREADY_STARTED; + mThreadState = THREAD_STATE_STARTING; + } + + // Save thread data pointer + mThreadData = data; + + LOG(DEBUG) << "Starting thread " << mThreadName << " (" << this << ")"; + + // construct thread attribute + res = pthread_attr_init(&attrib); + if (res != POSIX_OK) { + LOG(ALERT) << "pthread_attr_init failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + + // Set the thread stack size + res = pthread_attr_setstacksize(&attrib, mStackSize); + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_attr_setstacksize failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + // Create the thread detached + res = pthread_attr_setdetachstate(&attrib, PTHREAD_CREATE_DETACHED); + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_attr_setdetachstate failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + } + + // ===================================================================== + // Start the thread and synchronize with it + + // Start the thread! + res = pthread_create(&mThreadId, &attrib, threadAdaptor, (void *)this); + // Attributes are no longer needed. + pthread_attr_destroy(&attrib); + + if (res != POSIX_OK) + { + LOG(ALERT) << "pthread_create failed, returned " << res + << " in " << mThreadName << " (" << this << ")"; + + return PTHREAD_ERROR; + } + + // Wait for the thread to startup. + res = mThreadStartStopEvent.wait(mThreadStartupMutex, THREAD_STARTUP_TIMEOUT*1000); + + // If the thread does not start in THREAD_STARTUP_TIMEOUT seconds, + // then something is terribly wrong here. + if (res == ETIMEDOUT) + { + LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't started up in " + << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out."; + + return RETURN_TIMEOUT; + } + + // We're done with the initialization. + ackThreadStart(); + + // ToDo: Add other initialization here, e.g. adding this thread to a list of all threads. + + // Startup initialization finished. Signal this to started thread, so + // it could go on. + mThreadInitializedEvent.signal(); + + return RETURN_OK; +} +Thread::ReturnStatus Thread::stopThread() +{ + int res; + + LOG(DEBUG) << "Stopping thread " << mThreadName << " (" << this << ")"; + + while (1) { + ScopedLock lock(mThreadStateMutex); + + switch (mThreadState) { + case THREAD_STATE_IDLE: + // Nothing to do. + return RETURN_OK; + + case THREAD_STATE_STARTING: + // Something is wrong in thi world. + assert(mThreadState != THREAD_STATE_STARTING); + LOG(ALERT) << "Trying to stop thread " << mThreadName + << " (" << this << ") while it's trying to start."; + return WRONG_STATE; + + case THREAD_STATE_RUNNING: + // Request shudown + mThreadState = THREAD_STATE_STOPPING; + // no "break" here to fall through to the next case + + case THREAD_STATE_STOPPING: + // Wait for the thread to stop. + LOG(DEBUG) << "Waiting for thread " << mThreadName << " (" << this << ") to stop."; + res = mThreadStartStopEvent.wait(mThreadStateMutex, THREAD_STOP_TIMEOUT*1000); + LOG(DEBUG) << "Thread " << mThreadName << " (" << this << ") signalled stop " + << "with res=" << res << " and mThreadState=" << mThreadState; + + // If the thread does not stop in THREAD_STOP_TIMEOUT seconds, + // return error. It may be waiting for something. + if (res == ETIMEDOUT) + { + LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't stopped in " + << THREAD_STARTUP_TIMEOUT << " seconds. Bailing out."; + + return RETURN_TIMEOUT; + } + + // Conditional variable could return in case of a signal, so we should + // double check that the thread has indeed stopped. + if (mThreadState == THREAD_STATE_IDLE) + return RETURN_OK; + else + // Try again... + break; + } + } + + // We should never reach this line + assert(false); + return RETURN_OK; +} // vim: ts=4 sw=4 |