aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric <ewild@sysmocom.de>2023-05-08 12:56:56 +0200
committerEric <ewild@sysmocom.de>2023-07-07 19:12:39 +0200
commitc3e515a28b2f3c7f7ef27a1ab38ef198da961f1c (patch)
treedf19fb9a0c31fbd44ee6a682e6a8c2bed9ef7f8c
parentbcaafcaa9d6ac7d2e55fe2dd6d4392fb93ef29b7 (diff)
ms: rearrange code to allow clean exits
This allows gracefully terminating the application by introducing queue timeouts. Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
-rw-r--r--Transceiver52M/ms/bladerf_specific.h2
-rw-r--r--Transceiver52M/ms/itrq.h61
-rw-r--r--Transceiver52M/ms/ms.cpp2
-rw-r--r--Transceiver52M/ms/ms.h4
-rw-r--r--Transceiver52M/ms/ms_rx_lower.cpp6
-rw-r--r--Transceiver52M/ms/ms_upper.cpp45
-rw-r--r--Transceiver52M/ms/ms_upper.h2
7 files changed, 84 insertions, 38 deletions
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h
index 3dc4777..e32d77c 100644
--- a/Transceiver52M/ms/bladerf_specific.h
+++ b/Transceiver52M/ms/bladerf_specific.h
@@ -192,7 +192,7 @@ struct blade_hw {
struct bladerf_stream *rx_stream;
struct bladerf_stream *tx_stream;
// using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>;
- using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
+ using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
const unsigned int rxFullScale, txFullScale;
const int rxtxdelay;
diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h
index 1d9e217..69ff515 100644
--- a/Transceiver52M/ms/itrq.h
+++ b/Transceiver52M/ms/itrq.h
@@ -29,7 +29,58 @@
namespace spsc_detail
{
-template <bool block_read, bool block_write> class spsc_cond_detail {
+template <bool block_read, bool block_write>
+class spsc_cond_timeout_detail {
+ std::condition_variable cond_r, cond_w;
+ std::mutex lr, lw;
+ std::atomic_int r_flag, w_flag;
+ const int timeout_ms = 200;
+
+ public:
+ explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0)
+ {
+ }
+
+ ~spsc_cond_timeout_detail()
+ {
+ }
+
+ ssize_t spsc_check_r()
+ {
+ std::unique_lock<std::mutex> lk(lr);
+ if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return r_flag != 0; })) {
+ r_flag--;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ ssize_t spsc_check_w()
+ {
+ std::unique_lock<std::mutex> lk(lw);
+ if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return w_flag != 0; })) {
+ w_flag--;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ void spsc_notify_r()
+ {
+ std::unique_lock<std::mutex> lk(lr);
+ r_flag++;
+ cond_r.notify_one();
+ }
+ void spsc_notify_w()
+ {
+ std::unique_lock<std::mutex> lk(lw);
+ w_flag++;
+ cond_w.notify_one();
+ }
+};
+
+template <bool block_read, bool block_write>
+class spsc_cond_detail {
std::condition_variable cond_r, cond_w;
std::mutex lr, lw;
std::atomic_int r_flag, w_flag;
@@ -74,7 +125,8 @@ template <bool block_read, bool block_write> class spsc_cond_detail {
};
// originally designed for select loop integration
-template <bool block_read, bool block_write> class spsc_efd_detail {
+template <bool block_read, bool block_write>
+class spsc_efd_detail {
int efd_r, efd_w; /* eventfds used to block/notify readers/writers */
public:
@@ -191,4 +243,7 @@ class spsc : public T<block_read, block_write> {
template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_efd_detail> {};
template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
-class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {}; \ No newline at end of file
+class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
+template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
+class spsc_cond_timeout
+ : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_timeout_detail> {}; \ No newline at end of file
diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp
index 2e91cae..e587c05 100644
--- a/Transceiver52M/ms/ms.cpp
+++ b/Transceiver52M/ms/ms.cpp
@@ -78,7 +78,7 @@ bh_fn_t ms_trx::tx_bh()
};
}
-void ms_trx::start()
+void ms_trx::start_lower_ms()
{
if (stop_lower_threads_flag)
return;
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index efccffc..8ca9b02 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -117,7 +117,7 @@ struct one_burst {
};
};
-using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
+using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
enum class SCH_STATE { SEARCHING, FOUND };
@@ -267,7 +267,7 @@ struct ms_trx : public BASET {
sched_params::target hw_target;
single_thread_pool worker_thread;
- void start();
+ void start_lower_ms();
std::atomic<bool> upper_is_ready;
void set_upper_ready(bool is_ready);
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp
index 4d6ce18..26ee131 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -142,10 +142,8 @@ bool ms_trx::handle_sch_or_nb()
memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits));
}
- if (upper_is_ready) { // this is blocking, so only submit if there is a reader - only if upper exists!
- while (!rxqueue.spsc_push(&brst))
- ;
- }
+ while (upper_is_ready && !rxqueue.spsc_push(&brst))
+ ;
if (do_auto_gain)
maybe_update_gain(brst);
diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp
index a10d542..4b2f919 100644
--- a/Transceiver52M/ms/ms_upper.cpp
+++ b/Transceiver52M/ms/ms_upper.cpp
@@ -80,24 +80,33 @@ void upper_trx::start_threads()
while (!g_exit_flag) {
driveControl();
}
- std::cerr << "exit control!" << std::endl;
+ std::cerr << "exit U control!" << std::endl;
});
- msleep(1);
thr_tx = std::thread([this] {
set_name_aff_sched(sched_params::thread_names::U_TX);
while (!g_exit_flag) {
driveTx();
}
- std::cerr << "exit tx U!" << std::endl;
+ std::cerr << "exit U tx!" << std::endl;
});
- // atomic ensures data is not written to q until loop reads
- start_lower_ms();
+#ifdef LSANDEBUG
+ std::thread([this] {
+ set_name_aff_sched(sched_params::thread_names::LEAKCHECK);
+
+ while (1) {
+ std::this_thread::sleep_for(std::chrono::seconds{ 5 });
+ __lsan_do_recoverable_leak_check();
+ }
+ }).detach();
+#endif
+}
+void upper_trx::main_loop()
+{
set_name_aff_sched(sched_params::thread_names::U_RX);
+ set_upper_ready(true);
while (!g_exit_flag) {
- // set_upper_ready(true) needs to happen during cmd handling:
- // the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck..
driveReceiveFIFO();
osmo_select_main(1);
@@ -108,24 +117,8 @@ void upper_trx::start_threads()
}
}
set_upper_ready(false);
- std::cerr << "exit rx U!" << std::endl;
+ std::cerr << "exit U rx!" << std::endl;
mOn = false;
-
-#ifdef LSANDEBUG
- std::thread([this] {
- set_name_aff_sched(sched_params::thread_names::LEAKCHECK);
-
- while (1) {
- std::this_thread::sleep_for(std::chrono::seconds{ 5 });
- __lsan_do_recoverable_leak_check();
- }
- }).detach();
-#endif
-}
-
-void upper_trx::start_lower_ms()
-{
- ms_trx::start();
}
// signalvector is owning despite claiming not to, but we can pretend, too..
@@ -346,7 +339,7 @@ bool upper_trx::driveControl()
case TRXCON_PHYIF_CMDT_POWERON:
if (!mOn) {
mOn = true;
- set_upper_ready(true);
+ start_lower_ms();
}
break;
case TRXCON_PHYIF_CMDT_POWEROFF:
@@ -430,7 +423,7 @@ int main(int argc, char *argv[])
// blocking, will return when global exit is requested
trx->start_threads();
-
+ trx->main_loop();
trx->stop_threads();
trx->stop_upper_threads();
diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h
index bc9bd14..2362365 100644
--- a/Transceiver52M/ms/ms_upper.h
+++ b/Transceiver52M/ms/ms_upper.h
@@ -41,7 +41,7 @@ class upper_trx : public ms_trx {
public:
void start_threads();
- void start_lower_ms();
+ void main_loop();
void stop_upper_threads();
upper_trx(){};