aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric <ewild@sysmocom.de>2022-10-09 15:13:18 +0200
committerEric <ewild@sysmocom.de>2022-10-21 22:51:25 +0200
commit2f7b82f28244896aabb8aec595434cc254db324b (patch)
tree89aa2964e7de1eec6114dd759dd75a7e475eb49b
parent53615c3ccd1da264e4614b9fa05c3a09a7ec7c34 (diff)
wip ul burstssynctoy2
-rw-r--r--Transceiver52M/device/ipc2/ipcif.h347
-rw-r--r--Transceiver52M/device/ipc2/shmif.h4
-rw-r--r--Transceiver52M/ms/ipc_specific.h2
3 files changed, 277 insertions, 76 deletions
diff --git a/Transceiver52M/device/ipc2/ipcif.h b/Transceiver52M/device/ipc2/ipcif.h
index 25c1c15..72f4c28 100644
--- a/Transceiver52M/device/ipc2/ipcif.h
+++ b/Transceiver52M/device/ipc2/ipcif.h
@@ -24,9 +24,11 @@
#include <atomic>
#include <complex>
#include <cassert>
-#include "shmif.h"
-
+#include <deque>
#include <mutex>
+#include <vector>
+
+#include "shmif.h"
const int max_ul_rdlen = 1024 * 10;
const int max_dl_rdlen = 1024 * 10;
@@ -37,40 +39,236 @@ struct shm_if {
shm::sema r;
shm::sema w;
std::atomic<uint64_t> ts;
- std::atomic<size_t> len_req; // <-
- std::atomic<size_t> len_written; // ->
+ std::atomic<uint64_t> ts_req;
+ std::atomic<size_t> len_written_sps; // ->
sample_t buffer[max_ul_rdlen];
} ul;
struct {
shm::sema r;
shm::sema w;
std::atomic<uint64_t> ts;
- std::atomic<size_t> len_req;
- std::atomic<size_t> len_written;
+ std::atomic<uint64_t> ts_req;
+ std::atomic<size_t> len_written_sps;
sample_t buffer[max_dl_rdlen];
} dl;
};
// unique up to signed_type/2 diff
+// ex: uint8/int8 (250, 0) = -6
template <typename A> auto unsigned_diff(A a, A b) -> typename std::make_signed<A>::type
{
using stype = typename std::make_signed<A>::type;
return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a);
};
+constexpr inline int samp2byte(int v)
+{
+ return v * sizeof(sample_t);
+}
+constexpr inline int byte2samp(int v)
+{
+ return v / sizeof(sample_t);
+}
+
+struct ulentry {
+ bool done;
+ uint64_t ts;
+ unsigned int len_in_sps;
+ unsigned int read_pos_in_sps;
+ sample_t buf[1000];
+};
+/*
+ write: find read index +.. until marked free = "end" of current list
+
+ check:
+ within begin, end AND not free?
+ y:
+ copy (chunk)
+ if chunk advance burst buf ptr
+ n: next, advance, remove old.
+ */
+template <unsigned int num_bursts> class ulburstprovider {
+ std::mutex ul_q_m;
+ // std::deque<ulentry> ul_q;
+
+ // classic circular buffer
+ ulentry foo[num_bursts];
+ int current_index; // % num_bursts
+
+ void cur_buf_done()
+ {
+ foo[current_index].done = true;
+ current_index = current_index + 1 % num_bursts;
+ }
+ bool is_empty()
+ {
+ return foo[current_index].done = true;
+ }
+ void reset()
+ {
+ for (auto &i : foo)
+ i = {};
+ current_index = 0;
+ }
+ ulentry &find_free_at_end()
+ {
+ for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
+ i = (i + 1 % num_bursts), max_to_search++) {
+ if (foo[i].done)
+ return foo[i];
+ }
+ return foo[0]; // FIXME actually broken, q full, wat do?
+ }
+
+ void push_back(ulentry &e)
+ {
+ auto free_buf = find_free_at_end();
+ free_buf = e;
+ e.done = false;
+ }
+
+ public:
+ void add(ulentry &e)
+ {
+ std::lock_guard<std::mutex> foo(ul_q_m);
+ push_back(e);
+ }
+ void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t *buf, unsigned int max_buf_write_len)
+ {
+ std::lock_guard<std::mutex> g(ul_q_m);
+
+ /*
+ 1) if empty return
+ 2) if not empty prune stale bursts
+ 3) if only future bursts also return and zero buf
+ */
+ for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
+ i = (i + 1 % num_bursts), max_to_search++) {
+ auto cur_entry = foo[i];
+ if (is_empty()) { // might be empty due to advance below!
+ memset(buf, 0, samp2byte(req_len_in_sps));
+ return;
+ }
+
+ if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) { // remove late bursts
+ if (i == current_index) // only advance if we are at the front
+ cur_buf_done();
+ else
+ assert(true);
+ } else if (cur_entry.ts >= requested_ts + byte2samp(max_buf_write_len)) { // not in range
+ memset(buf, 0, samp2byte(req_len_in_sps));
+ return;
+
+ // FIXME: what about requested_ts <= entry.ts <= ts + reqlen?
+ } else {
+ // requested_ts <= cur_entry.ts <= requested_ts + byte2samp(max_write_len)
+
+ auto before_sps = unsigned_diff(cur_entry.ts, requested_ts);
+
+ // at least one whole buffer before our most recent "head" burst?
+ // set 0, return.
+ if (-before_sps >= byte2samp(max_buf_write_len)) {
+ memset(buf, 0, samp2byte(req_len_in_sps));
+ return;
+ }
+ // less than one full buffer before: pad 0
+ auto to_pad_sps = -before_sps;
+ memset(buf, 0, samp2byte(to_pad_sps));
+ requested_ts += to_pad_sps;
+ req_len_in_sps -= to_pad_sps;
+
+ if (!req_len_in_sps)
+ return;
+
+ // actual burst data after possible 0 pad
+ auto max_sps_to_write = std::min(cur_entry.len_in_sps, req_len_in_sps);
+ memcpy(&buf[samp2byte(to_pad_sps)], cur_entry.buf, samp2byte(max_sps_to_write));
+ requested_ts += max_sps_to_write;
+ req_len_in_sps -= max_sps_to_write;
+ cur_entry.read_pos_in_sps += max_sps_to_write;
+
+ //this buf is done...
+ if (cur_entry.read_pos_in_sps == cur_entry.len_in_sps) {
+ cur_buf_done();
+ }
+
+ if (!req_len_in_sps)
+ return;
+ }
+ }
+ }
+};
+
class trxmsif {
shm::shm<shm_if> m;
shm_if *ptr;
- volatile int dl_readoffset;
- bool first;
- int samp2byte(int v)
+ ulburstprovider<10> p;
+
+ template <typename T> void read(T &direction, size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
{
- return v * sizeof(sample_t);
+ static int readoffset_sps;
+ // auto &direction = ptr->dl;
+ auto buf = &direction.buffer[0];
+ size_t len_avail_sps = direction.len_written_sps.load();
+
+ auto left_to_read = len_avail_sps - readoffset_sps;
+
+ shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() << " " << readoffset_sps << std::endl;
+
+ // no data, wait for new buffer, maybe some data left afterwards
+ if (!left_to_read) {
+ assert(readoffset_sps == len_avail_sps);
+ readoffset_sps = 0;
+ direction.r.reset_unsafe();
+ direction.ts_req = (*read_ts);
+ direction.w.set(1);
+ direction.r.wait_and_reset(1);
+ assert(*read_ts != direction.ts.load());
+ // shm::sema_guard g(dl.r, dl.w);
+ *read_ts = direction.ts.load();
+ len_avail_sps = direction.len_written_sps.load();
+ readoffset_sps += howmany_sps;
+ assert(len_avail_sps >= howmany_sps);
+ memcpy(outbuf, buf, samp2byte(howmany_sps));
+
+ shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany_sps << std::endl;
+ return;
+ }
+
+ *read_ts = direction.ts.load() + readoffset_sps;
+ left_to_read = len_avail_sps - readoffset_sps;
+
+ // data left from prev read
+ if (left_to_read >= howmany_sps) {
+ memcpy(outbuf, &buf[readoffset_sps], samp2byte(howmany_sps));
+ readoffset_sps += howmany_sps;
+
+ shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany_sps << std::endl;
+ return;
+ } else {
+ memcpy(outbuf, &buf[readoffset_sps], samp2byte(left_to_read));
+ readoffset_sps = 0;
+ auto still_left_to_read = howmany_sps - left_to_read;
+ {
+ direction.r.reset_unsafe();
+ direction.ts_req = (*read_ts);
+ direction.w.set(1);
+ direction.r.wait_and_reset(1);
+ assert(*read_ts != direction.ts.load());
+ len_avail_sps = direction.len_written_sps.load();
+ assert(len_avail_sps >= still_left_to_read);
+ memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read));
+ readoffset_sps += still_left_to_read;
+ shm::mtx_log::print_guard()
+ << "\tr+++2 " << *read_ts << " " << howmany_sps << " " << still_left_to_read
+ << " new @" << direction.ts.load() << std::endl;
+ }
+ }
}
public:
- trxmsif() : m("trx-ms-if"), dl_readoffset(0), first(true)
+ trxmsif() : m("trx-ms-if")
{
}
@@ -97,96 +295,93 @@ class trxmsif {
return ptr->ms_connected == true;
}
- void write_dl(size_t howmany, uint64_t write_ts, sample_t *inbuf)
+ /* is being read from ms side */
+ void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
+ {
+ return read(ptr->dl, howmany_sps, read_ts, outbuf);
+ }
+
+ /* is being read from trx/network side */
+ void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
+ {
+ // if (ptr->ms_connected != true) {
+ memset(outbuf, 0, samp2byte(howmany_sps));
+ // return;
+ // }
+ // return read(ptr->ul, howmany_sps, read_ts, outbuf);
+ }
+
+ void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf)
{
auto &dl = ptr->dl;
auto buf = &dl.buffer[0];
- // if (ptr->ms_connected != true)
- // return;
+ if (ptr->ms_connected != true)
+ return;
- assert(sizeof(dl.buffer) >= samp2byte(howmany));
+ assert(sizeof(dl.buffer) >= samp2byte(howmany_sps));
// print_guard() << "####w " << std::endl;
{
shm::sema_wait_guard g(dl.w, dl.r);
- memcpy(buf, inbuf, samp2byte(howmany));
+ memcpy(buf, inbuf, samp2byte(howmany_sps));
dl.ts.store(write_ts);
- dl.len_written.store(howmany);
+ dl.len_written_sps.store(howmany_sps);
}
shm::mtx_log::print_guard() << std::endl
- << "####w+ " << write_ts << " " << howmany << std::endl
+ << "####w+ " << write_ts << " " << howmany_sps << std::endl
<< std::endl;
}
- void signal_read_start()
- { /* nop */
- }
-
- void read_dl(size_t howmany, uint64_t *read_ts, sample_t *outbuf)
+ void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t *inbuf)
{
- auto &dl = ptr->dl;
- auto buf = &dl.buffer[0];
- size_t len_avail = dl.len_written.load();
+ auto &ul = ptr->ul;
+ assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps));
+ // print_guard() << "####w " << std::endl;
- auto left_to_read = len_avail - dl_readoffset;
+ ulentry e;
+ e.ts = write_ts;
+ e.len_in_sps = howmany_sps_sps;
+ e.done = false;
+ e.read_pos_in_sps = 0;
+ assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps));
+ memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps));
+ p.add(e);
- shm::mtx_log::print_guard() << "\tr @" << dl.ts.load() << " " << dl_readoffset << std::endl;
+ shm::mtx_log::print_guard() << std::endl
+ << "####q+ " << write_ts << " " << howmany_sps_sps << std::endl
+ << std::endl;
+ }
- // no data, wait for new buffer, maybe some data left afterwards
- if (!left_to_read) {
- assert(dl_readoffset == len_avail);
- dl_readoffset = 0;
- dl.r.reset_unsafe();
- dl.w.set(1);
- dl.r.wait_and_reset(1);
- assert(*read_ts != dl.ts.load());
- // shm::sema_guard g(dl.r, dl.w);
- *read_ts = dl.ts.load();
- len_avail = dl.len_written.load();
- dl_readoffset += howmany;
- assert(len_avail >= howmany);
- memcpy(outbuf, buf, samp2byte(howmany));
+ void drive_tx()
+ {
+ auto &ul = ptr->ul;
+ auto buf = &ul.buffer[0];
+ const auto max_write_len = sizeof(ul.buffer);
- shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany << std::endl;
+ // ul_q_m.lock();
+ // ul_q.push_front(e);
+ // ul_q_m.unlock();
+ // ul.w.wait_and_reset();
+
+ // no read waiting for a write
+ if (!ul.w.check_unsafe(1))
return;
- }
- *read_ts = dl.ts.load() + dl_readoffset;
- left_to_read = len_avail - dl_readoffset;
+ // FIXME: store written, notify after get!
- // data left from prev read
- if (left_to_read >= howmany) {
- memcpy(outbuf, &buf[dl_readoffset], samp2byte(howmany));
- dl_readoffset += howmany;
+ auto requested_ts = ul.ts_req.load();
- shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany << std::endl;
- return;
- } else {
- memcpy(outbuf, &buf[dl_readoffset], samp2byte(left_to_read));
- dl_readoffset = 0;
- auto still_left_to_read = howmany - left_to_read;
- {
- dl.r.reset_unsafe();
- dl.w.set(1);
- dl.r.wait_and_reset(1);
- assert(*read_ts != dl.ts.load());
- len_avail = dl.len_written.load();
- assert(len_avail >= still_left_to_read);
- memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read));
- dl_readoffset += still_left_to_read;
- shm::mtx_log::print_guard()
- << "\tr+++2 " << *read_ts << " " << howmany << " " << still_left_to_read
- << " new @" << dl.ts.load() << std::endl;
- }
- }
+ p.get(requested_ts, byte2samp(max_write_len), buf, max_write_len);
+
+ // memset(buf, 0, max_write_len);
+ ul.ts.store(requested_ts);
+ ul.len_written_sps.store(byte2samp(max_write_len));
+ ul.w.reset_unsafe();
+ ul.r.set(1);
}
- void read_ul(size_t howmany, uint64_t *read_ts, sample_t *outbuf)
- {
- // if (ptr->ms_connected != true) {
- memset(outbuf, 0, samp2byte(howmany));
- return;
- // }
+ void signal_read_start()
+ { /* nop */
}
};
diff --git a/Transceiver52M/device/ipc2/shmif.h b/Transceiver52M/device/ipc2/shmif.h
index 09dcf8c..89413ab 100644
--- a/Transceiver52M/device/ipc2/shmif.h
+++ b/Transceiver52M/device/ipc2/shmif.h
@@ -235,6 +235,10 @@ class sema {
{
value = 0;
}
+ bool check_unsafe(int v)
+ {
+ return value == v;
+ }
sema(const sema &) = delete;
sema &operator=(const sema &) = delete;
};
diff --git a/Transceiver52M/ms/ipc_specific.h b/Transceiver52M/ms/ipc_specific.h
index 05181d3..8d67237 100644
--- a/Transceiver52M/ms/ipc_specific.h
+++ b/Transceiver52M/ms/ipc_specific.h
@@ -162,6 +162,8 @@ template <typename T> struct ipc_hw {
last_ts = rcd.get_first_ts();
}
+ m.drive_tx();
+
return ret;
}