diff options
author | Eric <ewild@sysmocom.de> | 2023-05-08 12:56:56 +0200 |
---|---|---|
committer | Eric <ewild@sysmocom.de> | 2023-07-07 19:12:39 +0200 |
commit | c3e515a28b2f3c7f7ef27a1ab38ef198da961f1c (patch) | |
tree | df19fb9a0c31fbd44ee6a682e6a8c2bed9ef7f8c | |
parent | bcaafcaa9d6ac7d2e55fe2dd6d4392fb93ef29b7 (diff) |
ms: rearrange code to allow clean exits
This allows gracefully terminating the application by introducing queue
timeouts.
Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
-rw-r--r-- | Transceiver52M/ms/bladerf_specific.h | 2 | ||||
-rw-r--r-- | Transceiver52M/ms/itrq.h | 61 | ||||
-rw-r--r-- | Transceiver52M/ms/ms.cpp | 2 | ||||
-rw-r--r-- | Transceiver52M/ms/ms.h | 4 | ||||
-rw-r--r-- | Transceiver52M/ms/ms_rx_lower.cpp | 6 | ||||
-rw-r--r-- | Transceiver52M/ms/ms_upper.cpp | 45 | ||||
-rw-r--r-- | Transceiver52M/ms/ms_upper.h | 2 |
7 files changed, 84 insertions, 38 deletions
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h index 3dc4777..e32d77c 100644 --- a/Transceiver52M/ms/bladerf_specific.h +++ b/Transceiver52M/ms/bladerf_specific.h @@ -192,7 +192,7 @@ struct blade_hw { struct bladerf_stream *rx_stream; struct bladerf_stream *tx_stream; // using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>; - using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>; + using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>; const unsigned int rxFullScale, txFullScale; const int rxtxdelay; diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h index 1d9e217..69ff515 100644 --- a/Transceiver52M/ms/itrq.h +++ b/Transceiver52M/ms/itrq.h @@ -29,7 +29,58 @@ namespace spsc_detail { -template <bool block_read, bool block_write> class spsc_cond_detail { +template <bool block_read, bool block_write> +class spsc_cond_timeout_detail { + std::condition_variable cond_r, cond_w; + std::mutex lr, lw; + std::atomic_int r_flag, w_flag; + const int timeout_ms = 200; + + public: + explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0) + { + } + + ~spsc_cond_timeout_detail() + { + } + + ssize_t spsc_check_r() + { + std::unique_lock<std::mutex> lk(lr); + if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return r_flag != 0; })) { + r_flag--; + return 1; + } else { + return 0; + } + } + ssize_t spsc_check_w() + { + std::unique_lock<std::mutex> lk(lw); + if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return w_flag != 0; })) { + w_flag--; + return 1; + } else { + return 0; + } + } + void spsc_notify_r() + { + std::unique_lock<std::mutex> lk(lr); + r_flag++; + cond_r.notify_one(); + } + void spsc_notify_w() + { + std::unique_lock<std::mutex> lk(lw); + w_flag++; + cond_w.notify_one(); + } +}; + +template <bool block_read, bool block_write> +class spsc_cond_detail { std::condition_variable cond_r, cond_w; std::mutex lr, lw; std::atomic_int r_flag, w_flag; @@ -74,7 +125,8 @@ template <bool block_read, bool block_write> class spsc_cond_detail { }; // originally designed for select loop integration -template <bool block_read, bool block_write> class spsc_efd_detail { +template <bool block_read, bool block_write> +class spsc_efd_detail { int efd_r, efd_w; /* eventfds used to block/notify readers/writers */ public: @@ -191,4 +243,7 @@ class spsc : public T<block_read, block_write> { template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_efd_detail> {}; template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> -class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
\ No newline at end of file +class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {}; +template <unsigned int SZ, typename ELEM, bool block_read, bool block_write> +class spsc_cond_timeout + : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_timeout_detail> {};
\ No newline at end of file diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp index 2e91cae..e587c05 100644 --- a/Transceiver52M/ms/ms.cpp +++ b/Transceiver52M/ms/ms.cpp @@ -78,7 +78,7 @@ bh_fn_t ms_trx::tx_bh() }; } -void ms_trx::start() +void ms_trx::start_lower_ms() { if (stop_lower_threads_flag) return; diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h index efccffc..8ca9b02 100644 --- a/Transceiver52M/ms/ms.h +++ b/Transceiver52M/ms/ms.h @@ -117,7 +117,7 @@ struct one_burst { }; }; -using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>; +using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, false>; enum class SCH_STATE { SEARCHING, FOUND }; @@ -267,7 +267,7 @@ struct ms_trx : public BASET { sched_params::target hw_target; single_thread_pool worker_thread; - void start(); + void start_lower_ms(); std::atomic<bool> upper_is_ready; void set_upper_ready(bool is_ready); diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp index 4d6ce18..26ee131 100644 --- a/Transceiver52M/ms/ms_rx_lower.cpp +++ b/Transceiver52M/ms/ms_rx_lower.cpp @@ -142,10 +142,8 @@ bool ms_trx::handle_sch_or_nb() memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits)); } - if (upper_is_ready) { // this is blocking, so only submit if there is a reader - only if upper exists! - while (!rxqueue.spsc_push(&brst)) - ; - } + while (upper_is_ready && !rxqueue.spsc_push(&brst)) + ; if (do_auto_gain) maybe_update_gain(brst); diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp index a10d542..4b2f919 100644 --- a/Transceiver52M/ms/ms_upper.cpp +++ b/Transceiver52M/ms/ms_upper.cpp @@ -80,24 +80,33 @@ void upper_trx::start_threads() while (!g_exit_flag) { driveControl(); } - std::cerr << "exit control!" << std::endl; + std::cerr << "exit U control!" << std::endl; }); - msleep(1); thr_tx = std::thread([this] { set_name_aff_sched(sched_params::thread_names::U_TX); while (!g_exit_flag) { driveTx(); } - std::cerr << "exit tx U!" << std::endl; + std::cerr << "exit U tx!" << std::endl; }); - // atomic ensures data is not written to q until loop reads - start_lower_ms(); +#ifdef LSANDEBUG + std::thread([this] { + set_name_aff_sched(sched_params::thread_names::LEAKCHECK); + + while (1) { + std::this_thread::sleep_for(std::chrono::seconds{ 5 }); + __lsan_do_recoverable_leak_check(); + } + }).detach(); +#endif +} +void upper_trx::main_loop() +{ set_name_aff_sched(sched_params::thread_names::U_RX); + set_upper_ready(true); while (!g_exit_flag) { - // set_upper_ready(true) needs to happen during cmd handling: - // the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck.. driveReceiveFIFO(); osmo_select_main(1); @@ -108,24 +117,8 @@ void upper_trx::start_threads() } } set_upper_ready(false); - std::cerr << "exit rx U!" << std::endl; + std::cerr << "exit U rx!" << std::endl; mOn = false; - -#ifdef LSANDEBUG - std::thread([this] { - set_name_aff_sched(sched_params::thread_names::LEAKCHECK); - - while (1) { - std::this_thread::sleep_for(std::chrono::seconds{ 5 }); - __lsan_do_recoverable_leak_check(); - } - }).detach(); -#endif -} - -void upper_trx::start_lower_ms() -{ - ms_trx::start(); } // signalvector is owning despite claiming not to, but we can pretend, too.. @@ -346,7 +339,7 @@ bool upper_trx::driveControl() case TRXCON_PHYIF_CMDT_POWERON: if (!mOn) { mOn = true; - set_upper_ready(true); + start_lower_ms(); } break; case TRXCON_PHYIF_CMDT_POWEROFF: @@ -430,7 +423,7 @@ int main(int argc, char *argv[]) // blocking, will return when global exit is requested trx->start_threads(); - + trx->main_loop(); trx->stop_threads(); trx->stop_upper_threads(); diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h index bc9bd14..2362365 100644 --- a/Transceiver52M/ms/ms_upper.h +++ b/Transceiver52M/ms/ms_upper.h @@ -41,7 +41,7 @@ class upper_trx : public ms_trx { public: void start_threads(); - void start_lower_ms(); + void main_loop(); void stop_upper_threads(); upper_trx(){}; |