From eb54bddf47e087cf340c8a65b36a03cebd4f174b Mon Sep 17 00:00:00 2001 From: Tom Tsou Date: Tue, 25 Nov 2014 16:06:32 -0800 Subject: Transceiver52M: Implement POWEROFF command Add stop and restart capability through the POWEROFF and POWERON commands. Calling stop causes receive streaming to cease, and I/O threads to shutdown leaving only the control handling thread running. Upon receiving a POWERON command, I/O threads and device streaming are restarted. Proper shutdown of the transceiver is now initiated by the destructor, which calls the stop command internally to wind down and deallocate threads. Signed-off-by: Tom Tsou --- Transceiver52M/Transceiver.cpp | 210 +++++++++++++++++++++++++------------- Transceiver52M/Transceiver.h | 16 ++- Transceiver52M/UHDDevice.cpp | 57 +++++++++-- Transceiver52M/osmo-trx.cpp | 2 - Transceiver52M/radioClock.cpp | 13 +-- Transceiver52M/radioInterface.cpp | 30 +++++- Transceiver52M/radioInterface.h | 3 +- 7 files changed, 229 insertions(+), 102 deletions(-) diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp index f8b34b3..4885654 100644 --- a/Transceiver52M/Transceiver.cpp +++ b/Transceiver52M/Transceiver.cpp @@ -84,42 +84,46 @@ void TransceiverState::init(size_t slot, signalVector *burst, bool fill) } Transceiver::Transceiver(int wBasePort, - const char *TRXAddress, + const char *wTRXAddress, size_t wSPS, size_t wChans, GSM::Time wTransmitLatency, RadioInterface *wRadioInterface) - : mBasePort(wBasePort), mAddr(TRXAddress), - mTransmitLatency(wTransmitLatency), mClockSocket(NULL), - mRadioInterface(wRadioInterface), mSPSTx(wSPS), mSPSRx(1), mChans(wChans), - mOn(false), mTxFreq(0.0), mRxFreq(0.0), mMaxExpectedDelay(0) + : mBasePort(wBasePort), mAddr(wTRXAddress), + mClockSocket(wBasePort, wTRXAddress, mBasePort + 100), + mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface), + mSPSTx(wSPS), mSPSRx(1), mChans(wChans), mOn(false), + mTxFreq(0.0), mRxFreq(0.0), mMaxExpectedDelay(0) { - GSM::Time startTime(random() % gHyperframe,0); - - mRxLowerLoopThread = new Thread(32768); - mTxLowerLoopThread = new Thread(32768); - - mTransmitDeadlineClock = startTime; - mLastClockUpdateTime = startTime; - mLatencyUpdateTime = startTime; - mRadioInterface->getClock()->set(startTime); - txFullScale = mRadioInterface->fullScaleInputValue(); rxFullScale = mRadioInterface->fullScaleOutputValue(); } Transceiver::~Transceiver() { - sigProcLibDestroy(); + stop(); - delete mClockSocket; + sigProcLibDestroy(); for (size_t i = 0; i < mChans; i++) { + mControlServiceLoopThreads[i]->cancel(); + mControlServiceLoopThreads[i]->join(); + delete mControlServiceLoopThreads[i]; + mTxPriorityQueues[i].clear(); delete mCtrlSockets[i]; delete mDataSockets[i]; } } +/* + * Initialize transceiver + * + * Start or restart the control loop. Any further control is handled through the + * socket API. Randomize the central radio clock set the downlink burst + * counters. Note that the clock will not update until the radio starts, but we + * are still expected to report clock indications through control channel + * activity. + */ bool Transceiver::init(bool filler) { int d_srcport, d_dstport, c_srcport, c_dstport; @@ -150,8 +154,7 @@ bool Transceiver::init(bool filler) if (filler) mStates[0].mRetrans = true; - mClockSocket = new UDPSocket(mBasePort, mAddr.c_str(), mBasePort + 100); - + /* Setup sockets */ for (size_t i = 0; i < mChans; i++) { c_srcport = mBasePort + 2 * i + 1; c_dstport = mBasePort + 2 * i + 101; @@ -162,10 +165,19 @@ bool Transceiver::init(bool filler) mDataSockets[i] = new UDPSocket(d_srcport, mAddr.c_str(), d_dstport); } + /* Randomize the central clock */ + GSM::Time startTime(random() % gHyperframe, 0); + mRadioInterface->getClock()->set(startTime); + mTransmitDeadlineClock = startTime; + mLastClockUpdateTime = startTime; + mLatencyUpdateTime = startTime; + + /* Start control threads */ for (size_t i = 0; i < mChans; i++) { + TransceiverChannel *chan = new TransceiverChannel(this, i); mControlServiceLoopThreads[i] = new Thread(32768); - mTxPriorityQueueServiceLoopThreads[i] = new Thread(32768); - mRxServiceLoopThreads[i] = new Thread(32768); + mControlServiceLoopThreads[i]->start((void * (*)(void*)) + ControlServiceLoopAdapter, (void*) chan); for (size_t n = 0; n < 8; n++) { burst = modulateBurst(gDummyBurst, 8 + (n % 4 == 0), mSPSTx); @@ -178,6 +190,106 @@ bool Transceiver::init(bool filler) return true; } +/* + * Start the transceiver + * + * Submit command(s) to the radio device to commence streaming samples and + * launch threads to handle sample I/O. Re-synchronize the transmit burst + * counters to the central radio clock here as well. + */ +bool Transceiver::start() +{ + ScopedLock lock(mLock); + + if (mOn) { + LOG(ERR) << "Transceiver already running"; + return false; + } + + LOG(NOTICE) << "Starting the transceiver"; + + GSM::Time time = mRadioInterface->getClock()->get(); + mTransmitDeadlineClock = time; + mLastClockUpdateTime = time; + mLatencyUpdateTime = time; + + if (!mRadioInterface->start()) { + LOG(ALERT) << "Device failed to start"; + return false; + } + + /* Device is running - launch I/O threads */ + mRxLowerLoopThread = new Thread(32768); + mTxLowerLoopThread = new Thread(32768); + mTxLowerLoopThread->start((void * (*)(void*)) + TxLowerLoopAdapter,(void*) this); + mRxLowerLoopThread->start((void * (*)(void*)) + RxLowerLoopAdapter,(void*) this); + + /* Launch uplink and downlink burst processing threads */ + for (size_t i = 0; i < mChans; i++) { + TransceiverChannel *chan = new TransceiverChannel(this, i); + mRxServiceLoopThreads[i] = new Thread(32768); + mRxServiceLoopThreads[i]->start((void * (*)(void*)) + RxUpperLoopAdapter, (void*) chan); + + chan = new TransceiverChannel(this, i); + mTxPriorityQueueServiceLoopThreads[i] = new Thread(32768); + mTxPriorityQueueServiceLoopThreads[i]->start((void * (*)(void*)) + TxUpperLoopAdapter, (void*) chan); + } + + writeClockInterface(); + mOn = true; + return true; +} + +/* + * Stop the transceiver + * + * Perform stopping by disabling receive streaming and issuing cancellation + * requests to running threads. Most threads will timeout and terminate once + * device is disabled, but the transmit loop may block waiting on the central + * UMTS clock. Explicitly signal the clock to make sure that the transmit loop + * makes it to the thread cancellation point. + */ +void Transceiver::stop() +{ + ScopedLock lock(mLock); + + if (!mOn) + return; + + LOG(NOTICE) << "Stopping the transceiver"; + mTxLowerLoopThread->cancel(); + mRxLowerLoopThread->cancel(); + + for (size_t i = 0; i < mChans; i++) { + mRxServiceLoopThreads[i]->cancel(); + mTxPriorityQueueServiceLoopThreads[i]->cancel(); + } + + LOG(INFO) << "Stopping the device"; + mRadioInterface->stop(); + + for (size_t i = 0; i < mChans; i++) { + mRxServiceLoopThreads[i]->join(); + mTxPriorityQueueServiceLoopThreads[i]->join(); + delete mRxServiceLoopThreads[i]; + delete mTxPriorityQueueServiceLoopThreads[i]; + + mTxPriorityQueues[i].clear(); + } + + mTxLowerLoopThread->join(); + mRxLowerLoopThread->join(); + delete mTxLowerLoopThread; + delete mRxLowerLoopThread; + + mOn = false; + LOG(NOTICE) << "Transceiver stopped"; +} + void Transceiver::addRadioVector(size_t chan, BitVector &bits, int RSSI, GSM::Time &wTime) { @@ -525,17 +637,6 @@ SoftVector *Transceiver::pullRadioVector(GSM::Time &wTime, int &RSSI, return bits; } -void Transceiver::start() -{ - TransceiverChannel *chan; - - for (size_t i = 0; i < mControlServiceLoopThreads.size(); i++) { - chan = new TransceiverChannel(this, i); - mControlServiceLoopThreads[i]->start((void * (*)(void*)) - ControlServiceLoopAdapter, (void*) chan); - } -} - void Transceiver::reset() { for (size_t i = 0; i < mTxPriorityQueues.size(); i++) @@ -574,39 +675,14 @@ void Transceiver::driveControl(size_t chan) LOG(INFO) << "command is " << buffer; if (strcmp(command,"POWEROFF")==0) { - // turn off transmitter/demod - sprintf(response,"RSP POWEROFF 0"); + stop(); + sprintf(response,"RSP POWEROFF 0"); } else if (strcmp(command,"POWERON")==0) { - // turn on transmitter/demod - if (!mTxFreq || !mRxFreq) + if (!start()) sprintf(response,"RSP POWERON 1"); - else { + else sprintf(response,"RSP POWERON 0"); - if (!chan && !mOn) { - // Prepare for thread start - mRadioInterface->start(); - - // Start radio interface threads. - mTxLowerLoopThread->start((void * (*)(void*)) - TxLowerLoopAdapter,(void*) this); - mRxLowerLoopThread->start((void * (*)(void*)) - RxLowerLoopAdapter,(void*) this); - - for (size_t i = 0; i < mChans; i++) { - TransceiverChannel *chan = new TransceiverChannel(this, i); - mRxServiceLoopThreads[i]->start((void * (*)(void*)) - RxUpperLoopAdapter, (void*) chan); - - chan = new TransceiverChannel(this, i); - mTxPriorityQueueServiceLoopThreads[i]->start((void * (*)(void*)) - TxUpperLoopAdapter, (void*) chan); - } - - writeClockInterface(); - mOn = true; - } - } } else if (strcmp(command,"SETMAXDLY")==0) { //set expected maximum time-of-arrival @@ -855,7 +931,7 @@ void Transceiver::writeClockInterface() LOG(INFO) << "ClockInterface: sending " << command; - mClockSocket->write(command, strlen(command) + 1); + mClockSocket.write(command, strlen(command) + 1); mLastClockUpdateTime = mTransmitDeadlineClock; @@ -923,15 +999,7 @@ void *TxUpperLoopAdapter(TransceiverChannel *chan) trx->setPriority(0.40); while (1) { - bool stale = false; - // Flush the UDP packets until a successful transfer. - while (!trx->driveTxPriorityQueue(num)) { - stale = true; - } - if (!num && stale) { - // If a packet was stale, remind the GSM stack of the clock. - trx->writeClockInterface(); - } + trx->driveTxPriorityQueue(num); pthread_testcancel(); } return NULL; diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h index 56f9115..0b81511 100644 --- a/Transceiver52M/Transceiver.h +++ b/Transceiver52M/Transceiver.h @@ -91,12 +91,10 @@ class Transceiver { private: int mBasePort; std::string mAddr; - GSM::Time mTransmitLatency; ///< latency between basestation clock and transmit deadline clock - GSM::Time mLatencyUpdateTime; ///< last time latency was updated std::vector mDataSockets; ///< socket for writing to/reading from GSM core std::vector mCtrlSockets; ///< socket for writing/reading control commands from GSM core - UDPSocket *mClockSocket; ///< socket for writing clock updates to GSM core + UDPSocket mClockSocket; ///< socket for writing clock updates to GSM core std::vector mTxPriorityQueues; ///< priority queue of transmit bursts received from GSM core std::vector mReceiveFIFO; ///< radioInterface FIFO of receive bursts @@ -107,6 +105,8 @@ private: std::vector mControlServiceLoopThreads; ///< thread to process control messages from GSM core std::vector mTxPriorityQueueServiceLoopThreads; ///< thread to process transmit bursts from GSM core + GSM::Time mTransmitLatency; ///< latency between basestation clock and transmit deadline clock + GSM::Time mLatencyUpdateTime; ///< last time latency was updated GSM::Time mTransmitDeadlineClock; ///< deadline for pushing bursts into transmit FIFO GSM::Time mLastClockUpdateTime; ///< last time clock update was sent up to core @@ -173,6 +173,13 @@ private: std::vector mStates; + /** Start and stop I/O threads through the control socket API */ + bool start(); + void stop(); + + /** Protect destructor accessable stop call */ + Mutex mLock; + public: /** Transceiver constructor @@ -191,8 +198,7 @@ public: /** Destructor */ ~Transceiver(); - /** start the Transceiver */ - void start(); + /** Start the control loop */ bool init(bool filler); /** attach the radioInterface receive FIFO */ diff --git a/Transceiver52M/UHDDevice.cpp b/Transceiver52M/UHDDevice.cpp index c914868..cbfc2e4 100644 --- a/Transceiver52M/UHDDevice.cpp +++ b/Transceiver52M/UHDDevice.cpp @@ -40,6 +40,14 @@ #define TX_AMPL 0.3 #define SAMPLE_BUF_SZ (1 << 20) +/* + * UHD timeout value on streaming (re)start + * + * Allow some time for streaming to commence after the start command is issued, + * but consider a wait beyond one second to be a definite error condition. + */ +#define UHD_RESTART_TIMEOUT 1.0 + enum uhd_dev_type { USRP1, USRP2, @@ -268,7 +276,7 @@ public: int open(const std::string &args, bool extref); bool start(); bool stop(); - void restart(); + bool restart(); void setPriority(float prio); enum TxWindowType getWindowType() { return tx_window; } @@ -313,8 +321,9 @@ public: enum err_code { ERROR_TIMING = -1, - ERROR_UNRECOVERABLE = -2, - ERROR_UNHANDLED = -3, + ERROR_TIMEOUT = -2, + ERROR_UNRECOVERABLE = -3, + ERROR_UNHANDLED = -4, }; private: @@ -358,7 +367,7 @@ private: uhd::tune_request_t select_freq(double wFreq, size_t chan, bool tx); bool set_freq(double freq, size_t chan, bool tx); - Thread async_event_thrd; + Thread *async_event_thrd; bool diversity; }; @@ -396,6 +405,12 @@ void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg) } } +static void thread_enable_cancel(bool cancel) +{ + cancel ? pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) : + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); +} + uhd_device::uhd_device(size_t sps, size_t chans, bool diversity, double offset) : tx_gain_min(0.0), tx_gain_max(0.0), rx_gain_min(0.0), rx_gain_max(0.0), @@ -727,7 +742,7 @@ bool uhd_device::flush_recv(size_t num_pkts) { uhd::rx_metadata_t md; size_t num_smpls; - float timeout = 0.1f; + float timeout = UHD_RESTART_TIMEOUT; std::vector > pkt_bufs(chans, std::vector(2 * rx_spp)); @@ -743,6 +758,8 @@ bool uhd_device::flush_recv(size_t num_pkts) if (!num_smpls) { switch (md.error_code) { case uhd::rx_metadata_t::ERROR_CODE_TIMEOUT: + LOG(ALERT) << "Device timed out"; + return false; default: continue; } @@ -756,7 +773,7 @@ bool uhd_device::flush_recv(size_t num_pkts) return true; } -void uhd_device::restart() +bool uhd_device::restart() { /* Allow 100 ms delay to align multi-channel streams */ double delay = 0.1; @@ -771,7 +788,7 @@ void uhd_device::restart() usrp_dev->issue_stream_cmd(cmd); - flush_recv(1); + return flush_recv(10); } bool uhd_device::start() @@ -787,10 +804,12 @@ bool uhd_device::start() uhd::msg::register_handler(&uhd_msg_handler); // Start asynchronous event (underrun check) loop - async_event_thrd.start((void * (*)(void*))async_event_loop, (void*)this); + async_event_thrd = new Thread(); + async_event_thrd->start((void * (*)(void*))async_event_loop, (void*)this); // Start streaming - restart(); + if (!restart()) + return false; // Display usrp time double time_now = usrp_dev->get_time_now().get_real_secs(); @@ -810,6 +829,10 @@ bool uhd_device::stop() usrp_dev->issue_stream_cmd(stream_cmd); + async_event_thrd->cancel(); + async_event_thrd->join(); + delete async_event_thrd; + started = false; return true; } @@ -830,6 +853,7 @@ int uhd_device::check_rx_md_err(uhd::rx_metadata_t &md, ssize_t num_smpls) switch (md.error_code) { case uhd::rx_metadata_t::ERROR_CODE_TIMEOUT: LOG(ALERT) << "UHD: Receive timed out"; + return ERROR_TIMEOUT; case uhd::rx_metadata_t::ERROR_CODE_OVERFLOW: case uhd::rx_metadata_t::ERROR_CODE_LATE_COMMAND: case uhd::rx_metadata_t::ERROR_CODE_BROKEN_CHAIN: @@ -899,8 +923,11 @@ int uhd_device::readSamples(std::vector &bufs, int len, bool *overrun, // Receive samples from the usrp until we have enough while (rx_buffers[0]->avail_smpls(timestamp) < len) { + thread_enable_cancel(false); size_t num_smpls = rx_stream->recv(pkt_ptrs, rx_spp, metadata, 0.1, true); + thread_enable_cancel(true); + rx_pkt_cnt++; // Check for errors @@ -910,6 +937,9 @@ int uhd_device::readSamples(std::vector &bufs, int len, bool *overrun, LOG(ALERT) << "UHD: Version " << uhd::get_version_string(); LOG(ALERT) << "UHD: Unrecoverable error, exiting..."; exit(-1); + case ERROR_TIMEOUT: + // Assume stopping condition + return 0; case ERROR_TIMING: restart(); case ERROR_UNHANDLED: @@ -988,7 +1018,10 @@ int uhd_device::writeSamples(std::vector &bufs, int len, bool *underrun } } + thread_enable_cancel(false); size_t num_smpls = tx_stream->send(bufs, len, metadata); + thread_enable_cancel(true); + if (num_smpls != (unsigned) len) { LOG(ALERT) << "UHD: Device send timed out"; } @@ -1124,7 +1157,11 @@ double uhd_device::getRxFreq(size_t chan) bool uhd_device::recv_async_msg() { uhd::async_metadata_t md; - if (!usrp_dev->get_device()->recv_async_msg(md)) + + thread_enable_cancel(false); + bool rc = usrp_dev->get_device()->recv_async_msg(md); + thread_enable_cancel(true); + if (!rc) return false; // Assume that any error requires resynchronization diff --git a/Transceiver52M/osmo-trx.cpp b/Transceiver52M/osmo-trx.cpp index 9215fa5..db0b2b1 100644 --- a/Transceiver52M/osmo-trx.cpp +++ b/Transceiver52M/osmo-trx.cpp @@ -391,8 +391,6 @@ int main(int argc, char *argv[]) if (!trx) goto shutdown; - trx->start(); - chans = trx->numChans(); std::cout << "-- Transceiver active with " << chans << " channel(s)" << std::endl; diff --git a/Transceiver52M/radioClock.cpp b/Transceiver52M/radioClock.cpp index 710018a..505bb01 100644 --- a/Transceiver52M/radioClock.cpp +++ b/Transceiver52M/radioClock.cpp @@ -23,32 +23,27 @@ void RadioClock::set(const GSM::Time& wTime) { - mLock.lock(); + ScopedLock lock(mLock); mClock = wTime; updateSignal.signal(); - mLock.unlock(); } void RadioClock::incTN() { - mLock.lock(); + ScopedLock lock(mLock); mClock.incTN(); updateSignal.signal(); - mLock.unlock(); } GSM::Time RadioClock::get() { - mLock.lock(); + ScopedLock lock(mLock); GSM::Time retVal = mClock; - mLock.unlock(); - return retVal; } void RadioClock::wait() { - mLock.lock(); + ScopedLock lock(mLock); updateSignal.wait(mLock,1); - mLock.unlock(); } diff --git a/Transceiver52M/radioInterface.cpp b/Transceiver52M/radioInterface.cpp index d67b486..369f2ac 100644 --- a/Transceiver52M/radioInterface.cpp +++ b/Transceiver52M/radioInterface.cpp @@ -171,15 +171,20 @@ bool RadioInterface::tuneRx(double freq, size_t chan) return mRadio->setRxFreq(freq, chan); } - -void RadioInterface::start() +bool RadioInterface::start() { - LOG(INFO) << "Starting radio"; + if (mOn) + return true; + + LOG(INFO) << "Starting radio device"; #ifdef USRP1 mAlignRadioServiceLoopThread.start((void * (*)(void*))AlignRadioServiceLoopAdapter, (void*)this); #endif - mRadio->start(); + + if (!mRadio->start()) + return false; + writeTimestamp = mRadio->initialWriteTimestamp(); readTimestamp = mRadio->initialReadTimestamp(); @@ -188,6 +193,23 @@ void RadioInterface::start() mOn = true; LOG(INFO) << "Radio started"; + return true; +} + +/* + * Stop the radio device + * + * This is a pass-through call to the device interface. Because the underlying + * stop command issuance generally doesn't return confirmation on device status, + * this call will only return false if the device is already stopped. + */ +bool RadioInterface::stop() +{ + if (!mOn || !mRadio->stop()) + return false; + + mOn = false; + return true; } #ifdef USRP1 diff --git a/Transceiver52M/radioInterface.h b/Transceiver52M/radioInterface.h index 877102f..b359cbd 100644 --- a/Transceiver52M/radioInterface.h +++ b/Transceiver52M/radioInterface.h @@ -78,7 +78,8 @@ private: public: /** start the interface */ - void start(); + bool start(); + bool stop(); /** intialization */ virtual bool init(int type); -- cgit v1.2.3