diff options
author | Eric <ewild@sysmocom.de> | 2023-05-23 12:50:25 +0200 |
---|---|---|
committer | Eric <ewild@sysmocom.de> | 2023-07-07 19:12:39 +0200 |
commit | 3e7f4b0da91b1e522fc52daea77c9a1ea3e8fe6f (patch) | |
tree | 7854572789e44bb5d77329608a488b20e8df1f45 | |
parent | 4080cd05ba911952c559e09131c0836b769c2698 (diff) |
ms: use single thread pool
...so we don't spawn threads all the time.
Used for gain avg/setting.
Change-Id: Id675550f55e8ccbbbe6b0d91fbffd01b6ede15f7
-rw-r--r-- | Transceiver52M/Makefile.am | 1 | ||||
-rw-r--r-- | Transceiver52M/ms/ms.h | 3 | ||||
-rw-r--r-- | Transceiver52M/ms/ms_rx_lower.cpp | 44 | ||||
-rw-r--r-- | Transceiver52M/ms/threadpool.h | 92 |
4 files changed, 122 insertions, 18 deletions
diff --git a/Transceiver52M/Makefile.am b/Transceiver52M/Makefile.am index 875e9a1..296ff05 100644 --- a/Transceiver52M/Makefile.am +++ b/Transceiver52M/Makefile.am @@ -97,6 +97,7 @@ noinst_HEADERS += \ ms/ms_upper.h \ ms/itrq.h \ ms/sch.h \ + ms/threadpool.h \ grgsm_vitac/viterbi_detector.h \ grgsm_vitac/constants.h \ grgsm_vitac/grgsm_vitac.h diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h index 7381397..d92f4b7 100644 --- a/Transceiver52M/ms/ms.h +++ b/Transceiver52M/ms/ms.h @@ -41,6 +41,7 @@ #include "Complex.h" #include "GSMCommon.h" #include "itrq.h" +#include "threadpool.h" const unsigned int ONE_TS_BURST_LEN = (3 + 58 + 26 + 58 + 3 + 8.25) * 4 /*sps*/; const unsigned int NUM_RXQ_FRAMES = 1; // rx thread <-> upper rx queue @@ -266,6 +267,7 @@ struct ms_trx : public BASET { time_keeper timekeeper; int hw_cpus; sched_params::target hw_target; + single_thread_pool worker_thread; void start(); std::atomic<bool> upper_is_ready; @@ -291,6 +293,7 @@ struct ms_trx : public BASET { hw_target(hw_cpus > 4 ? sched_params::target::ODROID : sched_params::target::PI4) { std::cerr << "scheduling for: " << (hw_cpus > 4 ? "odroid" : "pi4") << std::endl; + set_name_aff_sched(worker_thread.get_handle(), sched_params::thread_names::SCH_SEARCH); } virtual ~ms_trx() diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp index e8d8e0e..dc0d56d 100644 --- a/Transceiver52M/ms/ms_rx_lower.cpp +++ b/Transceiver52M/ms/ms_rx_lower.cpp @@ -30,6 +30,8 @@ #include "ms.h" #include "grgsm_vitac/grgsm_vitac.h" +#include "threadpool.h" + extern "C" { #include "sch.h" } @@ -126,8 +128,11 @@ void ms_trx::maybe_update_gain(one_burst &brst) gainoffset = runmean < (rxFullScale / 2 ? 2 : 1); float newgain = runmean < rx_max_cutoff ? rxgain + gainoffset : rxgain - gainoffset; // FIXME: gian cutoff - if (newgain != rxgain && newgain <= 60) - std::thread([this, newgain] { setRxGain(newgain); }).detach(); + if (newgain != rxgain && newgain <= 60) { + auto gain_fun = [this, newgain] { setRxGain(newgain); }; + worker_thread.add_task(gain_fun); + } + runmean = 0; } gain_check = (gain_check + 1) % avgburst_num; @@ -217,26 +222,36 @@ bool ms_trx::handle_sch(bool is_first_sch_acq) return false; } +/* +accumulates a full big buffer consisting of 8*12 timeslots, then: +either +1) adjusts gain if necessary and starts over +2) searches and finds SCH and is done +*/ SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd) { static unsigned int sch_pos = 0; + auto to_copy = SCH_LEN_SPS - sch_pos; + if (sch_thread_done) return SCH_STATE::FOUND; if (rcv_done) return SCH_STATE::SEARCHING; - auto to_copy = SCH_LEN_SPS - sch_pos; - - if (SCH_LEN_SPS == to_copy) // first time + if (sch_pos == 0) // keep first ts for time delta calc first_sch_buf_rcv_ts = rcd->get_first_ts(); - if (!to_copy) { + if (to_copy) { + auto spsmax = rcd->actual_samples_per_buffer(); + if (to_copy > (unsigned int)spsmax) + sch_pos += rcd->readall(first_sch_buf + sch_pos); + else + sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy); + } else { // (!to_copy) sch_pos = 0; rcv_done = true; - std::thread([this] { - set_name_aff_sched(sched_params::thread_names::SCH_SEARCH); - + auto sch_search_fun = [this] { auto ptr = reinterpret_cast<const int16_t *>(first_sch_buf); const auto target_val = rxFullScale / 8; float sum = 0; @@ -255,16 +270,9 @@ SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd) if (!sch_thread_done) rcv_done = false; // retry! - return (bool)sch_thread_done; - }).detach(); + }; + worker_thread.add_task(sch_search_fun); } - - auto spsmax = rcd->actual_samples_per_buffer(); - if (to_copy > (unsigned int)spsmax) - sch_pos += rcd->readall(first_sch_buf + sch_pos); - else - sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy); - return SCH_STATE::SEARCHING; } diff --git a/Transceiver52M/ms/threadpool.h b/Transceiver52M/ms/threadpool.h new file mode 100644 index 0000000..a5dec97 --- /dev/null +++ b/Transceiver52M/ms/threadpool.h @@ -0,0 +1,92 @@ +#pragma once +/* + * (C) 2023 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> + * All Rights Reserved + * + * Author: Eric Wild <ewild@sysmocom.de> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#include <functional> +#include <thread> +#include <atomic> +#include <vector> +#include <future> +#include <mutex> +#include <queue> + +struct single_thread_pool { + std::mutex m; + std::condition_variable cv; + std::atomic<bool> stop_flag; + std::atomic<bool> is_ready; + std::deque<std::function<void()>> wq; + std::thread worker_thread; + + template <class F> + void add_task(F &&f) + { + std::unique_lock<std::mutex> l(m); + wq.emplace_back(std::forward<F>(f)); + cv.notify_one(); + return; + } + + single_thread_pool() : worker_thread(std::thread([this] { thread_loop(); })) + { + } + ~single_thread_pool() + { + stop(); + } + + std::thread::native_handle_type get_handle() + { + return worker_thread.native_handle(); + } + + private: + void stop() + { + { + std::unique_lock<std::mutex> l(m); + wq.clear(); + stop_flag = true; + cv.notify_one(); + } + worker_thread.join(); + } + + void thread_loop() + { + while (true) { + is_ready = true; + std::function<void()> f; + { + std::unique_lock<std::mutex> l(m); + if (wq.empty()) { + cv.wait(l, [&] { return !wq.empty() || stop_flag; }); + } + if (stop_flag) + return; + is_ready = false; + f = std::move(wq.front()); + wq.pop_front(); + } + f(); + } + } +};
\ No newline at end of file |