aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric <ewild@sysmocom.de>2023-05-23 12:50:25 +0200
committerEric <ewild@sysmocom.de>2023-07-07 19:12:39 +0200
commit3e7f4b0da91b1e522fc52daea77c9a1ea3e8fe6f (patch)
tree7854572789e44bb5d77329608a488b20e8df1f45
parent4080cd05ba911952c559e09131c0836b769c2698 (diff)
ms: use single thread pool
...so we don't spawn threads all the time. Used for gain avg/setting. Change-Id: Id675550f55e8ccbbbe6b0d91fbffd01b6ede15f7
-rw-r--r--Transceiver52M/Makefile.am1
-rw-r--r--Transceiver52M/ms/ms.h3
-rw-r--r--Transceiver52M/ms/ms_rx_lower.cpp44
-rw-r--r--Transceiver52M/ms/threadpool.h92
4 files changed, 122 insertions, 18 deletions
diff --git a/Transceiver52M/Makefile.am b/Transceiver52M/Makefile.am
index 875e9a1..296ff05 100644
--- a/Transceiver52M/Makefile.am
+++ b/Transceiver52M/Makefile.am
@@ -97,6 +97,7 @@ noinst_HEADERS += \
ms/ms_upper.h \
ms/itrq.h \
ms/sch.h \
+ ms/threadpool.h \
grgsm_vitac/viterbi_detector.h \
grgsm_vitac/constants.h \
grgsm_vitac/grgsm_vitac.h
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index 7381397..d92f4b7 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -41,6 +41,7 @@
#include "Complex.h"
#include "GSMCommon.h"
#include "itrq.h"
+#include "threadpool.h"
const unsigned int ONE_TS_BURST_LEN = (3 + 58 + 26 + 58 + 3 + 8.25) * 4 /*sps*/;
const unsigned int NUM_RXQ_FRAMES = 1; // rx thread <-> upper rx queue
@@ -266,6 +267,7 @@ struct ms_trx : public BASET {
time_keeper timekeeper;
int hw_cpus;
sched_params::target hw_target;
+ single_thread_pool worker_thread;
void start();
std::atomic<bool> upper_is_ready;
@@ -291,6 +293,7 @@ struct ms_trx : public BASET {
hw_target(hw_cpus > 4 ? sched_params::target::ODROID : sched_params::target::PI4)
{
std::cerr << "scheduling for: " << (hw_cpus > 4 ? "odroid" : "pi4") << std::endl;
+ set_name_aff_sched(worker_thread.get_handle(), sched_params::thread_names::SCH_SEARCH);
}
virtual ~ms_trx()
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp
index e8d8e0e..dc0d56d 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -30,6 +30,8 @@
#include "ms.h"
#include "grgsm_vitac/grgsm_vitac.h"
+#include "threadpool.h"
+
extern "C" {
#include "sch.h"
}
@@ -126,8 +128,11 @@ void ms_trx::maybe_update_gain(one_burst &brst)
gainoffset = runmean < (rxFullScale / 2 ? 2 : 1);
float newgain = runmean < rx_max_cutoff ? rxgain + gainoffset : rxgain - gainoffset;
// FIXME: gian cutoff
- if (newgain != rxgain && newgain <= 60)
- std::thread([this, newgain] { setRxGain(newgain); }).detach();
+ if (newgain != rxgain && newgain <= 60) {
+ auto gain_fun = [this, newgain] { setRxGain(newgain); };
+ worker_thread.add_task(gain_fun);
+ }
+
runmean = 0;
}
gain_check = (gain_check + 1) % avgburst_num;
@@ -217,26 +222,36 @@ bool ms_trx::handle_sch(bool is_first_sch_acq)
return false;
}
+/*
+accumulates a full big buffer consisting of 8*12 timeslots, then:
+either
+1) adjusts gain if necessary and starts over
+2) searches and finds SCH and is done
+*/
SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd)
{
static unsigned int sch_pos = 0;
+ auto to_copy = SCH_LEN_SPS - sch_pos;
+
if (sch_thread_done)
return SCH_STATE::FOUND;
if (rcv_done)
return SCH_STATE::SEARCHING;
- auto to_copy = SCH_LEN_SPS - sch_pos;
-
- if (SCH_LEN_SPS == to_copy) // first time
+ if (sch_pos == 0) // keep first ts for time delta calc
first_sch_buf_rcv_ts = rcd->get_first_ts();
- if (!to_copy) {
+ if (to_copy) {
+ auto spsmax = rcd->actual_samples_per_buffer();
+ if (to_copy > (unsigned int)spsmax)
+ sch_pos += rcd->readall(first_sch_buf + sch_pos);
+ else
+ sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy);
+ } else { // (!to_copy)
sch_pos = 0;
rcv_done = true;
- std::thread([this] {
- set_name_aff_sched(sched_params::thread_names::SCH_SEARCH);
-
+ auto sch_search_fun = [this] {
auto ptr = reinterpret_cast<const int16_t *>(first_sch_buf);
const auto target_val = rxFullScale / 8;
float sum = 0;
@@ -255,16 +270,9 @@ SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd)
if (!sch_thread_done)
rcv_done = false; // retry!
- return (bool)sch_thread_done;
- }).detach();
+ };
+ worker_thread.add_task(sch_search_fun);
}
-
- auto spsmax = rcd->actual_samples_per_buffer();
- if (to_copy > (unsigned int)spsmax)
- sch_pos += rcd->readall(first_sch_buf + sch_pos);
- else
- sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy);
-
return SCH_STATE::SEARCHING;
}
diff --git a/Transceiver52M/ms/threadpool.h b/Transceiver52M/ms/threadpool.h
new file mode 100644
index 0000000..a5dec97
--- /dev/null
+++ b/Transceiver52M/ms/threadpool.h
@@ -0,0 +1,92 @@
+#pragma once
+/*
+ * (C) 2023 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved
+ *
+ * Author: Eric Wild <ewild@sysmocom.de>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <future>
+#include <mutex>
+#include <queue>
+
+struct single_thread_pool {
+ std::mutex m;
+ std::condition_variable cv;
+ std::atomic<bool> stop_flag;
+ std::atomic<bool> is_ready;
+ std::deque<std::function<void()>> wq;
+ std::thread worker_thread;
+
+ template <class F>
+ void add_task(F &&f)
+ {
+ std::unique_lock<std::mutex> l(m);
+ wq.emplace_back(std::forward<F>(f));
+ cv.notify_one();
+ return;
+ }
+
+ single_thread_pool() : worker_thread(std::thread([this] { thread_loop(); }))
+ {
+ }
+ ~single_thread_pool()
+ {
+ stop();
+ }
+
+ std::thread::native_handle_type get_handle()
+ {
+ return worker_thread.native_handle();
+ }
+
+ private:
+ void stop()
+ {
+ {
+ std::unique_lock<std::mutex> l(m);
+ wq.clear();
+ stop_flag = true;
+ cv.notify_one();
+ }
+ worker_thread.join();
+ }
+
+ void thread_loop()
+ {
+ while (true) {
+ is_ready = true;
+ std::function<void()> f;
+ {
+ std::unique_lock<std::mutex> l(m);
+ if (wq.empty()) {
+ cv.wait(l, [&] { return !wq.empty() || stop_flag; });
+ }
+ if (stop_flag)
+ return;
+ is_ready = false;
+ f = std::move(wq.front());
+ wq.pop_front();
+ }
+ f();
+ }
+ }
+}; \ No newline at end of file