aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Chemeris <Alexander.Chemeris@gmail.com>2013-07-14 12:42:52 +0400
committerAlexander Chemeris <Alexander.Chemeris@gmail.com>2013-07-14 23:11:54 +0400
commitde202e343564913e0591de10850e617d7de55b6b (patch)
tree1e45714611d2f02462c1d7bc883ed6f534014335
parente279124660a2add686e6c9266d18a9bb0cf43258 (diff)
Transceiver52M: Port all threads to the new Thread interface.
Also note that we introduced shutdown() method for Transceiver threads which implement proper shutdown of threads when they are in blocking read state. This involves using shutdown() on sockets and pushing NULL to queues. With this change we should be able to start/stop transceiver channels at arbitrary moments.
-rw-r--r--Transceiver52M/DriveLoop.cpp39
-rw-r--r--Transceiver52M/DriveLoop.h14
-rw-r--r--Transceiver52M/Transceiver.cpp130
-rw-r--r--Transceiver52M/Transceiver.h57
-rw-r--r--Transceiver52M/UHDDevice.cpp27
5 files changed, 166 insertions, 101 deletions
diff --git a/Transceiver52M/DriveLoop.cpp b/Transceiver52M/DriveLoop.cpp
index f2e81c6..51ce750 100644
--- a/Transceiver52M/DriveLoop.cpp
+++ b/Transceiver52M/DriveLoop.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2008, 2009, 2010, 2012 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This software is distributed under the terms of the GNU Public License.
* See the COPYING file in the main directory for details.
@@ -31,10 +32,11 @@ DriveLoop::DriveLoop(int wBasePort, const char *TRXAddress,
RadioInterface *wRadioInterface,
int wChanM, int wC0, int wSamplesPerSymbol,
GSM::Time wTransmitLatency)
- :mClockSocket(wBasePort, TRXAddress, wBasePort + 100), mC0(wC0)
+: Thread("DriveLoop")
+, mClockSocket(wBasePort, TRXAddress, wBasePort + 100)
+, mC0(wC0)
{
mChanM = wChanM;
- mRadioDriveLoopThread = NULL;
mSamplesPerSymbol = wSamplesPerSymbol;
mRadioInterface = wRadioInterface;
@@ -74,33 +76,15 @@ DriveLoop::DriveLoop(int wBasePort, const char *TRXAddress,
mChanType[n][i] = NONE;
}
}
-
- mOn = false;
}
DriveLoop::~DriveLoop()
{
- if (mOn) {
- mOn = false;
-
- if (mRadioDriveLoopThread)
- delete mRadioDriveLoopThread;
- }
-
+ stopThread();
delete gsmPulse;
sigProcLibDestroy();
}
-void DriveLoop::start()
-{
- if (mOn)
- return;
-
- mOn = true;
- mRadioDriveLoopThread = new Thread(32768);
- mRadioDriveLoopThread->start((void * (*)(void*))RadioDriveLoopAdapter, (void*) this);
-}
-
void DriveLoop::pushRadioVector(GSM::Time &nowTime)
{
int i;
@@ -273,15 +257,12 @@ void DriveLoop::writeClockInterface()
mLastClockUpdateTime = mTransmitDeadlineClock;
}
-void *RadioDriveLoopAdapter(DriveLoop *drive)
+void DriveLoop::runThread()
{
- drive->setPriority();
+ setPriority();
- while (drive->on()) {
- drive->driveReceiveFIFO();
- drive->driveTransmitFIFO();
- pthread_testcancel();
+ while (isThreadRunning()) {
+ driveReceiveFIFO();
+ driveTransmitFIFO();
}
-
- return NULL;
}
diff --git a/Transceiver52M/DriveLoop.h b/Transceiver52M/DriveLoop.h
index 7a2dfc3..f396abd 100644
--- a/Transceiver52M/DriveLoop.h
+++ b/Transceiver52M/DriveLoop.h
@@ -1,5 +1,6 @@
/*
* Copyright 2008, 2012 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This software is distributed under the terms of the GNU Public License.
* See the COPYING file in the main directory for details.
@@ -44,7 +45,7 @@
//#define TRANSMIT_LOGGING 1
/** The Transceiver class, responsible for physical layer of basestation */
-class DriveLoop {
+class DriveLoop : public Thread {
private:
@@ -56,8 +57,6 @@ private:
VectorQueue mTransmitPriorityQueue[CHAN_MAX]; ///< priority queue of transmit bursts received from GSM core
- Thread *mRadioDriveLoopThread; ///< thread to push/pull bursts into transmit/receive FIFO
-
GSM::Time mTransmitDeadlineClock; ///< deadline for pushing bursts into transmit FIFO
GSM::Time mStartTime; ///< random start time of the radio clock
@@ -83,7 +82,6 @@ private:
int mSamplesPerSymbol; ///< number of samples per GSM symbol
- bool mOn; ///< flag to indicate that transceiver is powered on
int fillerModulus[CHAN_MAX][8]; ///< modulus values of all timeslots, in frames
signalVector *fillerTable[CHAN_MAX][102][8]; ///< table of modulated filler waveforms for all timeslots
@@ -112,9 +110,6 @@ public:
/** Destructor */
~DriveLoop();
- /** start the Transceiver */
- void start();
-
VectorQueue *priorityQueue(int m) { return &mTransmitPriorityQueue[m]; }
/** Codes for burst types of received bursts*/
@@ -178,13 +173,10 @@ protected:
*/
bool driveTransmitPriorityQueue();
- friend void *RadioDriveLoopAdapter(DriveLoop *);
+ virtual void runThread();
void reset();
- /** return drive loop status */
- bool on() { return mOn; }
-
/** set priority on current thread */
void setPriority() { mRadioInterface->setPriority(); }
diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp
index 67f6822..39cad3a 100644
--- a/Transceiver52M/Transceiver.cpp
+++ b/Transceiver52M/Transceiver.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2008, 2009, 2010, 2012 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This software is distributed under the terms of the GNU Public License.
* See the COPYING file in the main directory for details.
@@ -51,15 +52,13 @@ using namespace GSM;
Transceiver::Transceiver(int wBasePort, const char *TRXAddress,
DriveLoop *wDriveLoop, RadioInterface *wRadioInterface,
int wSamplesPerSymbol, int wChannel, bool wPrimary)
- :mDataSocket(wBasePort+2,TRXAddress,wBasePort+102),
+ :mBasePort(wBasePort), mTRXAddress(TRXAddress),
+ mDataSocket(wBasePort+2,TRXAddress,wBasePort+102),
mControlSocket(wBasePort+1,TRXAddress,wBasePort+101),
mDriveLoop(wDriveLoop), mRadioInterface(wRadioInterface),
mSamplesPerSymbol(wSamplesPerSymbol), mTransmitPriorityQueue(NULL),
mChannel(wChannel), mPrimary(wPrimary)
{
- mFIFOServiceLoopThread = NULL;
- mControlServiceLoopThread = NULL;
- mTransmitPriorityQueueServiceLoopThread = NULL;
mMaxExpectedDelay = 0;
// generate pulse and setup up signal processing library
@@ -93,12 +92,14 @@ Transceiver::Transceiver(int wBasePort, const char *TRXAddress,
Transceiver::~Transceiver()
{
+ // Stop all threads before freeing up
+ mFIFOServiceLoop.shutdown();
+ mControlServiceLoop.shutdown();
+ mTransmitPriorityQueueServiceLoop.shutdown();
+
+ // Now free all allocated data
delete gsmPulse;
mTransmitPriorityQueue->clear();
-
- delete mFIFOServiceLoopThread;
- delete mControlServiceLoopThread;
- delete mTransmitPriorityQueueServiceLoopThread;
}
@@ -304,16 +305,12 @@ void Transceiver::pullFIFO()
void Transceiver::start()
{
mRunning = true;
- mControlServiceLoopThread = new Thread(32768);
- mControlServiceLoopThread->start((void * (*)(void*))ControlServiceLoopAdapter,(void*) this);
+ mControlServiceLoop.startThread((void*) this);
if (!mPrimary) {
mOn = true;
- mFIFOServiceLoopThread = new Thread(32768);
- mFIFOServiceLoopThread->start((void * (*)(void*))FIFOServiceLoopAdapter,(void*) this);
-
- mTransmitPriorityQueueServiceLoopThread = new Thread(32768);
- mTransmitPriorityQueueServiceLoopThread->start((void * (*)(void*))TransmitPriorityQueueServiceLoopAdapter,(void*) this);
+ mFIFOServiceLoop.startThread((void*) this);
+ mTransmitPriorityQueueServiceLoop.startThread((void*) this);
}
}
@@ -321,6 +318,9 @@ void Transceiver::shutdown()
{
mOn = false;
mRunning = false;
+ mControlServiceLoop.shutdown();
+ mFIFOServiceLoop.shutdown();
+ mTransmitPriorityQueueServiceLoop.shutdown();
}
void Transceiver::reset()
@@ -382,18 +382,15 @@ void Transceiver::driveControl()
// Prepare for thread start
mPower = -20;
mRadioInterface->start();
- mDriveLoop->start();
+ mDriveLoop->startThread();
mDriveLoop->writeClockInterface();
generateRACHSequence(*gsmPulse,mSamplesPerSymbol);
// Start radio interface threads.
mOn = true;
- mFIFOServiceLoopThread = new Thread(32768);
- mFIFOServiceLoopThread->start((void * (*)(void*))FIFOServiceLoopAdapter,(void*) this);
-
- mTransmitPriorityQueueServiceLoopThread = new Thread(32768);
- mTransmitPriorityQueueServiceLoopThread->start((void * (*)(void*))TransmitPriorityQueueServiceLoopAdapter,(void*) this);
+ mFIFOServiceLoop.startThread((void*) this);
+ mTransmitPriorityQueueServiceLoop.startThread((void*) this);
}
}
}
@@ -560,27 +557,94 @@ bool Transceiver::driveTransmitPriorityQueue()
}
-void *FIFOServiceLoopAdapter(Transceiver *transceiver)
+Thread::ReturnStatus FIFOServiceLoopThread::shutdown()
{
- while (transceiver->on()) {
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ if (transceiver == NULL)
+ // Nothing to do
+ return ALREADY_IDLE;
+
+ transceiver->mFIFOServiceLoop.requestThreadStop();
+ if (transceiver->mReceiveFIFO != NULL) {
+ // Write twice, because read() function may read twice in case of NULL.
+ transceiver->mReceiveFIFO->write(NULL);
+ transceiver->mReceiveFIFO->write(NULL);
+ }
+ return transceiver->mFIFOServiceLoop.stopThread();
+}
+
+void FIFOServiceLoopThread::runThread()
+{
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ while (isThreadRunning()) {
transceiver->pullFIFO();
- pthread_testcancel();
}
- return NULL;
+
+ LOG(DEBUG) << "FIFOServiceLoopThread has finished operations";
+}
+
+Thread::ReturnStatus ControlServiceLoopThread::shutdown()
+{
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ if (transceiver == NULL)
+ // Nothing to do
+ return ALREADY_IDLE;
+
+ transceiver->mControlServiceLoop.requestThreadStop();
+ // FIXME: We should use shutdown() here, but the socket should be
+ // re-openned on the next start() then. Righ now the socket
+ // is created in the constructor and if we shutdown() it here,
+ // we'll get errors when we try to use it on the next start.
+// transceiver->mControlSocket.shutdown();
+ {
+ // mBasePort+1 is mControlSocket port
+ UDPSocket tmpSock(0, "127.0.0.1", transceiver->mBasePort+1);
+ tmpSock.write(NULL, 0);
+ }
+ return transceiver->mControlServiceLoop.stopThread();
}
-void *ControlServiceLoopAdapter(Transceiver *transceiver)
+void ControlServiceLoopThread::runThread()
{
- while (transceiver->running()) {
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ while (isThreadRunning()) {
transceiver->driveControl();
- pthread_testcancel();
}
- return NULL;
+
+ LOG(DEBUG) << "ControlServiceLoopThread has finished operations";
}
-void *TransmitPriorityQueueServiceLoopAdapter(Transceiver *transceiver)
+Thread::ReturnStatus TransmitPriorityQueueServiceLoopThread::shutdown()
{
- while (transceiver->on()) {
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ if (transceiver == NULL)
+ // Nothing to do
+ return ALREADY_IDLE;
+
+ transceiver->mTransmitPriorityQueueServiceLoop.requestThreadStop();
+ // FIXME: We should use shutdown() here, but the socket should be
+ // re-openned on the next start() then. Righ now the socket
+ // is created in the constructor and if we shutdown() it here,
+ // we'll get errors when we try to use it on the next start.
+// transceiver->mDataSocket.shutdown();
+ {
+ // mBasePort+2 is mDataSocket port
+ UDPSocket tmpSock(0, "127.0.0.1", transceiver->mBasePort+2);
+ tmpSock.write(NULL, 0);
+ }
+ return transceiver->mTransmitPriorityQueueServiceLoop.stopThread();
+}
+
+void TransmitPriorityQueueServiceLoopThread::runThread()
+{
+ Transceiver *transceiver = (Transceiver *)mThreadData;
+
+ while (isThreadRunning()) {
bool stale = false;
// Flush the UDP packets until a successful transfer.
@@ -591,7 +655,7 @@ void *TransmitPriorityQueueServiceLoopAdapter(Transceiver *transceiver)
// If a packet was stale, remind the GSM stack of the clock.
transceiver->getDriveLoop()->writeClockInterface();
}
- pthread_testcancel();
}
- return NULL;
+
+ LOG(DEBUG) << "TransmitPriorityQueueServiceLoopThread has finished operations";
}
diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h
index c58ca8c..91c34ee 100644
--- a/Transceiver52M/Transceiver.h
+++ b/Transceiver52M/Transceiver.h
@@ -1,5 +1,6 @@
/*
* Copyright 2008, 2012 Free Software Foundation, Inc.
+* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This software is distributed under the terms of the GNU Public License.
* See the COPYING file in the main directory for details.
@@ -41,21 +42,56 @@
/** Define this to be the slot number to be logged. */
//#define TRANSMIT_LOGGING 1
+/** FIFO thread loop */
+class FIFOServiceLoopThread : public Thread {
+public:
+ FIFOServiceLoopThread() : Thread("FIFOServiceLoopThread") {}
+ Thread::ReturnStatus shutdown();
+
+protected:
+ virtual void runThread();
+};
+
+/** control message handler thread loop */
+class ControlServiceLoopThread : public Thread {
+public:
+ ControlServiceLoopThread() : Thread("ControlServiceLoopThread") {}
+ Thread::ReturnStatus shutdown();
+
+protected:
+ virtual void runThread();
+};
+
+/** transmit queueing thread loop */
+class TransmitPriorityQueueServiceLoopThread : public Thread {
+public:
+ TransmitPriorityQueueServiceLoopThread() : Thread("TransmitPriorityQueueServiceLoopThread") {}
+ Thread::ReturnStatus shutdown();
+
+protected:
+ virtual void runThread();
+};
+
/** The Transceiver class, responsible for physical layer of basestation */
class Transceiver {
private:
DriveLoop *mDriveLoop;
+ int mBasePort; ///< Base port address for all our ports
+ std::string mTRXAddress; ///< Address of the BTS TRX control interface
UDPSocket mDataSocket; ///< socket for writing to/reading from GSM core
UDPSocket mControlSocket; ///< socket for writing/reading control commands from GSM core
VectorQueue *mTransmitPriorityQueue; ///< priority queue of transmit bursts received from GSM core
VectorFIFO* mReceiveFIFO; ///< radioInterface FIFO of receive bursts
- Thread *mFIFOServiceLoopThread; ///< thread to push/pull bursts into transmit/receive FIFO
- Thread *mControlServiceLoopThread; ///< thread to process control messages from GSM core
- Thread *mTransmitPriorityQueueServiceLoopThread;///< thread to process transmit bursts from GSM core
+ friend class FIFOServiceLoopThread;
+ FIFOServiceLoopThread mFIFOServiceLoop; ///< thread to push/pull bursts into transmit/receive FIFO
+ friend class ControlServiceLoopThread;
+ ControlServiceLoopThread mControlServiceLoop; ///< thread to process control messages from GSM core
+ friend class TransmitPriorityQueueServiceLoopThread;
+ TransmitPriorityQueueServiceLoopThread mTransmitPriorityQueueServiceLoop;///< thread to process transmit bursts from GSM core
int mChannel; ///< channelizer attach number between 0 and 'M-1'
@@ -151,12 +187,6 @@ protected:
*/
bool driveTransmitPriorityQueue();
- friend void *FIFOServiceLoopAdapter(Transceiver *);
-
- friend void *ControlServiceLoopAdapter(Transceiver *);
-
- friend void *TransmitPriorityQueueServiceLoopAdapter(Transceiver *);
-
void reset();
/** return transceiver on/off status */
@@ -171,12 +201,3 @@ protected:
/** set priority on current thread */
void setPriority() { mRadioInterface->setPriority(); }
};
-
-/** FIFO thread loop */
-void *FIFOServiceLoopAdapter(Transceiver *);
-
-/** control message handler thread loop */
-void *ControlServiceLoopAdapter(Transceiver *);
-
-/** transmit queueing thread loop */
-void *TransmitPriorityQueueServiceLoopAdapter(Transceiver *);
diff --git a/Transceiver52M/UHDDevice.cpp b/Transceiver52M/UHDDevice.cpp
index e2d8537..6ff09b9 100644
--- a/Transceiver52M/UHDDevice.cpp
+++ b/Transceiver52M/UHDDevice.cpp
@@ -3,6 +3,7 @@
* Written by Thomas Tsou <ttsou@vt.edu>
*
* Copyright 2010,2011 Free Software Foundation, Inc.
+ * Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@@ -206,6 +207,15 @@ private:
size_t data_end;
};
+/** transmit queueing thread loop */
+class UHDAsyncEventThread : public Thread {
+public:
+ UHDAsyncEventThread() : Thread("UHDAsyncEventThread") {}
+
+protected:
+ virtual void runThread();
+};
+
/*
uhd_device - UHD implementation of the Device interface. Timestamped samples
are sent to and received from the device. An intermediate buffer
@@ -312,14 +322,14 @@ private:
std::string str_code(uhd::rx_metadata_t metadata);
std::string str_code(uhd::async_metadata_t metadata);
- Thread *async_event_thrd;
+ UHDAsyncEventThread async_event_thrd;
};
-void *async_event_loop(uhd_device *dev)
+void UHDAsyncEventThread::runThread()
{
- while (dev->running()) {
+ uhd_device *dev = (uhd_device *)mThreadData;
+ while (isThreadRunning()) {
dev->recv_async_msg();
- pthread_testcancel();
}
}
@@ -349,8 +359,7 @@ uhd_device::uhd_device(int sps, bool skip_rx)
: tx_gain_min(0.0), tx_gain_max(0.0),
rx_gain_min(0.0), rx_gain_max(0.0),
tx_spp(0), rx_spp(0), started(false), aligned(false),
- rx_pkt_cnt(0), drop_cnt(0), prev_ts(0,0), ts_offset(0),
- async_event_thrd(NULL)
+ rx_pkt_cnt(0), drop_cnt(0), prev_ts(0,0), ts_offset(0)
{
this->sps = sps;
this->skip_rx = skip_rx;
@@ -698,8 +707,7 @@ bool uhd_device::start()
setPriority();
// Start asynchronous event (underrun check) loop
- async_event_thrd = new Thread(32768);
- async_event_thrd->start((void * (*)(void*))async_event_loop, (void*)this);
+ async_event_thrd.startThread((void*)this);
// Start streaming
restart(uhd::time_spec_t(0.0));
@@ -716,6 +724,7 @@ bool uhd_device::stop()
if (!started)
return false;
+ async_event_thrd.stopThread();
started = false;
uhd::stream_cmd_t stream_cmd =
@@ -723,8 +732,6 @@ bool uhd_device::stop()
usrp_dev->issue_stream_cmd(stream_cmd);
- delete async_event_thrd;
-
return true;
}