diff options
author | Eric <ewild@sysmocom.de> | 2022-10-09 15:13:18 +0200 |
---|---|---|
committer | Eric <ewild@sysmocom.de> | 2022-10-21 22:51:25 +0200 |
commit | 2f7b82f28244896aabb8aec595434cc254db324b (patch) | |
tree | 89aa2964e7de1eec6114dd759dd75a7e475eb49b | |
parent | 53615c3ccd1da264e4614b9fa05c3a09a7ec7c34 (diff) |
wip ul burstssynctoy2
Change-Id: I1c92751a91b34da2539e12939ee5609045725272
-rw-r--r-- | Transceiver52M/device/ipc2/ipcif.h | 347 | ||||
-rw-r--r-- | Transceiver52M/device/ipc2/shmif.h | 4 | ||||
-rw-r--r-- | Transceiver52M/ms/ipc_specific.h | 2 |
3 files changed, 277 insertions, 76 deletions
diff --git a/Transceiver52M/device/ipc2/ipcif.h b/Transceiver52M/device/ipc2/ipcif.h index 25c1c15..72f4c28 100644 --- a/Transceiver52M/device/ipc2/ipcif.h +++ b/Transceiver52M/device/ipc2/ipcif.h @@ -24,9 +24,11 @@ #include <atomic> #include <complex> #include <cassert> -#include "shmif.h" - +#include <deque> #include <mutex> +#include <vector> + +#include "shmif.h" const int max_ul_rdlen = 1024 * 10; const int max_dl_rdlen = 1024 * 10; @@ -37,40 +39,236 @@ struct shm_if { shm::sema r; shm::sema w; std::atomic<uint64_t> ts; - std::atomic<size_t> len_req; // <- - std::atomic<size_t> len_written; // -> + std::atomic<uint64_t> ts_req; + std::atomic<size_t> len_written_sps; // -> sample_t buffer[max_ul_rdlen]; } ul; struct { shm::sema r; shm::sema w; std::atomic<uint64_t> ts; - std::atomic<size_t> len_req; - std::atomic<size_t> len_written; + std::atomic<uint64_t> ts_req; + std::atomic<size_t> len_written_sps; sample_t buffer[max_dl_rdlen]; } dl; }; // unique up to signed_type/2 diff +// ex: uint8/int8 (250, 0) = -6 template <typename A> auto unsigned_diff(A a, A b) -> typename std::make_signed<A>::type { using stype = typename std::make_signed<A>::type; return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a); }; +constexpr inline int samp2byte(int v) +{ + return v * sizeof(sample_t); +} +constexpr inline int byte2samp(int v) +{ + return v / sizeof(sample_t); +} + +struct ulentry { + bool done; + uint64_t ts; + unsigned int len_in_sps; + unsigned int read_pos_in_sps; + sample_t buf[1000]; +}; +/* + write: find read index +.. until marked free = "end" of current list + + check: + within begin, end AND not free? + y: + copy (chunk) + if chunk advance burst buf ptr + n: next, advance, remove old. + */ +template <unsigned int num_bursts> class ulburstprovider { + std::mutex ul_q_m; + // std::deque<ulentry> ul_q; + + // classic circular buffer + ulentry foo[num_bursts]; + int current_index; // % num_bursts + + void cur_buf_done() + { + foo[current_index].done = true; + current_index = current_index + 1 % num_bursts; + } + bool is_empty() + { + return foo[current_index].done = true; + } + void reset() + { + for (auto &i : foo) + i = {}; + current_index = 0; + } + ulentry &find_free_at_end() + { + for (int i = current_index, max_to_search = 0; max_to_search < num_bursts; + i = (i + 1 % num_bursts), max_to_search++) { + if (foo[i].done) + return foo[i]; + } + return foo[0]; // FIXME actually broken, q full, wat do? + } + + void push_back(ulentry &e) + { + auto free_buf = find_free_at_end(); + free_buf = e; + e.done = false; + } + + public: + void add(ulentry &e) + { + std::lock_guard<std::mutex> foo(ul_q_m); + push_back(e); + } + void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t *buf, unsigned int max_buf_write_len) + { + std::lock_guard<std::mutex> g(ul_q_m); + + /* + 1) if empty return + 2) if not empty prune stale bursts + 3) if only future bursts also return and zero buf + */ + for (int i = current_index, max_to_search = 0; max_to_search < num_bursts; + i = (i + 1 % num_bursts), max_to_search++) { + auto cur_entry = foo[i]; + if (is_empty()) { // might be empty due to advance below! + memset(buf, 0, samp2byte(req_len_in_sps)); + return; + } + + if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) { // remove late bursts + if (i == current_index) // only advance if we are at the front + cur_buf_done(); + else + assert(true); + } else if (cur_entry.ts >= requested_ts + byte2samp(max_buf_write_len)) { // not in range + memset(buf, 0, samp2byte(req_len_in_sps)); + return; + + // FIXME: what about requested_ts <= entry.ts <= ts + reqlen? + } else { + // requested_ts <= cur_entry.ts <= requested_ts + byte2samp(max_write_len) + + auto before_sps = unsigned_diff(cur_entry.ts, requested_ts); + + // at least one whole buffer before our most recent "head" burst? + // set 0, return. + if (-before_sps >= byte2samp(max_buf_write_len)) { + memset(buf, 0, samp2byte(req_len_in_sps)); + return; + } + // less than one full buffer before: pad 0 + auto to_pad_sps = -before_sps; + memset(buf, 0, samp2byte(to_pad_sps)); + requested_ts += to_pad_sps; + req_len_in_sps -= to_pad_sps; + + if (!req_len_in_sps) + return; + + // actual burst data after possible 0 pad + auto max_sps_to_write = std::min(cur_entry.len_in_sps, req_len_in_sps); + memcpy(&buf[samp2byte(to_pad_sps)], cur_entry.buf, samp2byte(max_sps_to_write)); + requested_ts += max_sps_to_write; + req_len_in_sps -= max_sps_to_write; + cur_entry.read_pos_in_sps += max_sps_to_write; + + //this buf is done... + if (cur_entry.read_pos_in_sps == cur_entry.len_in_sps) { + cur_buf_done(); + } + + if (!req_len_in_sps) + return; + } + } + } +}; + class trxmsif { shm::shm<shm_if> m; shm_if *ptr; - volatile int dl_readoffset; - bool first; - int samp2byte(int v) + ulburstprovider<10> p; + + template <typename T> void read(T &direction, size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) { - return v * sizeof(sample_t); + static int readoffset_sps; + // auto &direction = ptr->dl; + auto buf = &direction.buffer[0]; + size_t len_avail_sps = direction.len_written_sps.load(); + + auto left_to_read = len_avail_sps - readoffset_sps; + + shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() << " " << readoffset_sps << std::endl; + + // no data, wait for new buffer, maybe some data left afterwards + if (!left_to_read) { + assert(readoffset_sps == len_avail_sps); + readoffset_sps = 0; + direction.r.reset_unsafe(); + direction.ts_req = (*read_ts); + direction.w.set(1); + direction.r.wait_and_reset(1); + assert(*read_ts != direction.ts.load()); + // shm::sema_guard g(dl.r, dl.w); + *read_ts = direction.ts.load(); + len_avail_sps = direction.len_written_sps.load(); + readoffset_sps += howmany_sps; + assert(len_avail_sps >= howmany_sps); + memcpy(outbuf, buf, samp2byte(howmany_sps)); + + shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany_sps << std::endl; + return; + } + + *read_ts = direction.ts.load() + readoffset_sps; + left_to_read = len_avail_sps - readoffset_sps; + + // data left from prev read + if (left_to_read >= howmany_sps) { + memcpy(outbuf, &buf[readoffset_sps], samp2byte(howmany_sps)); + readoffset_sps += howmany_sps; + + shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany_sps << std::endl; + return; + } else { + memcpy(outbuf, &buf[readoffset_sps], samp2byte(left_to_read)); + readoffset_sps = 0; + auto still_left_to_read = howmany_sps - left_to_read; + { + direction.r.reset_unsafe(); + direction.ts_req = (*read_ts); + direction.w.set(1); + direction.r.wait_and_reset(1); + assert(*read_ts != direction.ts.load()); + len_avail_sps = direction.len_written_sps.load(); + assert(len_avail_sps >= still_left_to_read); + memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read)); + readoffset_sps += still_left_to_read; + shm::mtx_log::print_guard() + << "\tr+++2 " << *read_ts << " " << howmany_sps << " " << still_left_to_read + << " new @" << direction.ts.load() << std::endl; + } + } } public: - trxmsif() : m("trx-ms-if"), dl_readoffset(0), first(true) + trxmsif() : m("trx-ms-if") { } @@ -97,96 +295,93 @@ class trxmsif { return ptr->ms_connected == true; } - void write_dl(size_t howmany, uint64_t write_ts, sample_t *inbuf) + /* is being read from ms side */ + void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) + { + return read(ptr->dl, howmany_sps, read_ts, outbuf); + } + + /* is being read from trx/network side */ + void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) + { + // if (ptr->ms_connected != true) { + memset(outbuf, 0, samp2byte(howmany_sps)); + // return; + // } + // return read(ptr->ul, howmany_sps, read_ts, outbuf); + } + + void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf) { auto &dl = ptr->dl; auto buf = &dl.buffer[0]; - // if (ptr->ms_connected != true) - // return; + if (ptr->ms_connected != true) + return; - assert(sizeof(dl.buffer) >= samp2byte(howmany)); + assert(sizeof(dl.buffer) >= samp2byte(howmany_sps)); // print_guard() << "####w " << std::endl; { shm::sema_wait_guard g(dl.w, dl.r); - memcpy(buf, inbuf, samp2byte(howmany)); + memcpy(buf, inbuf, samp2byte(howmany_sps)); dl.ts.store(write_ts); - dl.len_written.store(howmany); + dl.len_written_sps.store(howmany_sps); } shm::mtx_log::print_guard() << std::endl - << "####w+ " << write_ts << " " << howmany << std::endl + << "####w+ " << write_ts << " " << howmany_sps << std::endl << std::endl; } - void signal_read_start() - { /* nop */ - } - - void read_dl(size_t howmany, uint64_t *read_ts, sample_t *outbuf) + void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t *inbuf) { - auto &dl = ptr->dl; - auto buf = &dl.buffer[0]; - size_t len_avail = dl.len_written.load(); + auto &ul = ptr->ul; + assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps)); + // print_guard() << "####w " << std::endl; - auto left_to_read = len_avail - dl_readoffset; + ulentry e; + e.ts = write_ts; + e.len_in_sps = howmany_sps_sps; + e.done = false; + e.read_pos_in_sps = 0; + assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps)); + memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps)); + p.add(e); - shm::mtx_log::print_guard() << "\tr @" << dl.ts.load() << " " << dl_readoffset << std::endl; + shm::mtx_log::print_guard() << std::endl + << "####q+ " << write_ts << " " << howmany_sps_sps << std::endl + << std::endl; + } - // no data, wait for new buffer, maybe some data left afterwards - if (!left_to_read) { - assert(dl_readoffset == len_avail); - dl_readoffset = 0; - dl.r.reset_unsafe(); - dl.w.set(1); - dl.r.wait_and_reset(1); - assert(*read_ts != dl.ts.load()); - // shm::sema_guard g(dl.r, dl.w); - *read_ts = dl.ts.load(); - len_avail = dl.len_written.load(); - dl_readoffset += howmany; - assert(len_avail >= howmany); - memcpy(outbuf, buf, samp2byte(howmany)); + void drive_tx() + { + auto &ul = ptr->ul; + auto buf = &ul.buffer[0]; + const auto max_write_len = sizeof(ul.buffer); - shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany << std::endl; + // ul_q_m.lock(); + // ul_q.push_front(e); + // ul_q_m.unlock(); + // ul.w.wait_and_reset(); + + // no read waiting for a write + if (!ul.w.check_unsafe(1)) return; - } - *read_ts = dl.ts.load() + dl_readoffset; - left_to_read = len_avail - dl_readoffset; + // FIXME: store written, notify after get! - // data left from prev read - if (left_to_read >= howmany) { - memcpy(outbuf, &buf[dl_readoffset], samp2byte(howmany)); - dl_readoffset += howmany; + auto requested_ts = ul.ts_req.load(); - shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany << std::endl; - return; - } else { - memcpy(outbuf, &buf[dl_readoffset], samp2byte(left_to_read)); - dl_readoffset = 0; - auto still_left_to_read = howmany - left_to_read; - { - dl.r.reset_unsafe(); - dl.w.set(1); - dl.r.wait_and_reset(1); - assert(*read_ts != dl.ts.load()); - len_avail = dl.len_written.load(); - assert(len_avail >= still_left_to_read); - memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read)); - dl_readoffset += still_left_to_read; - shm::mtx_log::print_guard() - << "\tr+++2 " << *read_ts << " " << howmany << " " << still_left_to_read - << " new @" << dl.ts.load() << std::endl; - } - } + p.get(requested_ts, byte2samp(max_write_len), buf, max_write_len); + + // memset(buf, 0, max_write_len); + ul.ts.store(requested_ts); + ul.len_written_sps.store(byte2samp(max_write_len)); + ul.w.reset_unsafe(); + ul.r.set(1); } - void read_ul(size_t howmany, uint64_t *read_ts, sample_t *outbuf) - { - // if (ptr->ms_connected != true) { - memset(outbuf, 0, samp2byte(howmany)); - return; - // } + void signal_read_start() + { /* nop */ } }; diff --git a/Transceiver52M/device/ipc2/shmif.h b/Transceiver52M/device/ipc2/shmif.h index 09dcf8c..89413ab 100644 --- a/Transceiver52M/device/ipc2/shmif.h +++ b/Transceiver52M/device/ipc2/shmif.h @@ -235,6 +235,10 @@ class sema { { value = 0; } + bool check_unsafe(int v) + { + return value == v; + } sema(const sema &) = delete; sema &operator=(const sema &) = delete; }; diff --git a/Transceiver52M/ms/ipc_specific.h b/Transceiver52M/ms/ipc_specific.h index 05181d3..8d67237 100644 --- a/Transceiver52M/ms/ipc_specific.h +++ b/Transceiver52M/ms/ipc_specific.h @@ -162,6 +162,8 @@ template <typename T> struct ipc_hw { last_ts = rcd.get_first_ts(); } + m.drive_tx(); + return ret; } |