/* * Copyright 2020 sysmocom - s.f.m.c. GmbH * Author: Eric Wild * * SPDX-License-Identifier: 0BSD * * Permission to use, copy, modify, and/or distribute this software for any purpose * with or without fee is hereby granted.THE SOFTWARE IS PROVIDED "AS IS" AND THE * AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR * BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF * CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE * USE OR PERFORMANCE OF THIS SOFTWARE. */ #ifdef __cplusplus extern "C" { #endif #include #include "ipc_shm.h" #include #include #include #include #include #include #include #include #include #ifdef __cplusplus } #endif #define SAMPLE_SIZE_BYTE (sizeof(uint16_t) * 2) struct ipc_shm_io *ipc_shm_init_consumer(struct ipc_shm_stream *s) { unsigned int i; struct ipc_shm_io *r = (struct ipc_shm_io *)malloc(sizeof(struct ipc_shm_io)); r->this_stream = s->raw; r->buf_ptrs = (volatile struct ipc_shm_raw_smpl_buf **)malloc(sizeof(struct ipc_shm_raw_smpl_buf *) * s->num_buffers); /* save actual ptrs */ for (i = 0; i < s->num_buffers; i++) r->buf_ptrs[i] = s->buffers[i]; r->partial_read_begin_ptr = 0; return r; } struct ipc_shm_io *ipc_shm_init_producer(struct ipc_shm_stream *s) { int rv; pthread_mutexattr_t att; pthread_condattr_t t1, t2; struct ipc_shm_io *r = ipc_shm_init_consumer(s); rv = pthread_mutexattr_init(&att); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_mutexattr_setrobust(&att, PTHREAD_MUTEX_ROBUST); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_mutexattr_setpshared(&att, PTHREAD_PROCESS_SHARED); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_mutex_init((pthread_mutex_t *)&r->this_stream->lock, &att); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } pthread_mutexattr_destroy(&att); rv = pthread_condattr_setpshared(&t1, PTHREAD_PROCESS_SHARED); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_condattr_setpshared(&t2, PTHREAD_PROCESS_SHARED); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_cond_init((pthread_cond_t *)&r->this_stream->cf, &t1); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } rv = pthread_cond_init((pthread_cond_t *)&r->this_stream->ce, &t2); if (rv != 0) { osmo_panic("%s:%d rv:%d", __FILE__, __LINE__, rv); } pthread_condattr_destroy(&t1); pthread_condattr_destroy(&t2); r->this_stream->read_next = 0; r->this_stream->write_next = 0; return r; } void ipc_shm_close(struct ipc_shm_io *r) { if (r) { free(r->buf_ptrs); free(r); } } int32_t ipc_shm_enqueue(struct ipc_shm_io *r, uint64_t timestamp, uint32_t len_in_sps, uint16_t *data) { volatile struct ipc_shm_raw_smpl_buf *buf; int32_t rv; struct timespec tv; clock_gettime(CLOCK_REALTIME, &tv); tv.tv_sec += 1; rv = pthread_mutex_timedlock((pthread_mutex_t *)&r->this_stream->lock, &tv); if (rv != 0) return -rv; while (((r->this_stream->write_next + 1) & (r->this_stream->num_buffers - 1)) == r->this_stream->read_next && rv == 0) rv = pthread_cond_timedwait((pthread_cond_t *)&r->this_stream->ce, (pthread_mutex_t *)&r->this_stream->lock, &tv); if (rv != 0) return -rv; buf = r->buf_ptrs[r->this_stream->write_next]; buf->timestamp = timestamp; rv = len_in_sps <= r->this_stream->buffer_size ? len_in_sps : r->this_stream->buffer_size; memcpy((void *)buf->samples, data, SAMPLE_SIZE_BYTE * rv); buf->data_len = rv; r->this_stream->write_next = (r->this_stream->write_next + 1) & (r->this_stream->num_buffers - 1); pthread_cond_signal((pthread_cond_t *)&r->this_stream->cf); pthread_mutex_unlock((pthread_mutex_t *)&r->this_stream->lock); return rv; } int32_t ipc_shm_read(struct ipc_shm_io *r, uint16_t *out_buf, uint32_t num_samples, uint64_t *timestamp, uint32_t timeout_seconds) { volatile struct ipc_shm_raw_smpl_buf *buf; int32_t rv; uint8_t freeflag = 0; struct timespec tv; clock_gettime(CLOCK_REALTIME, &tv); tv.tv_sec += timeout_seconds; rv = pthread_mutex_timedlock((pthread_mutex_t *)&r->this_stream->lock, &tv); if (rv != 0) return -rv; while (r->this_stream->write_next == r->this_stream->read_next && rv == 0) rv = pthread_cond_timedwait((pthread_cond_t *)&r->this_stream->cf, (pthread_mutex_t *)&r->this_stream->lock, &tv); if (rv != 0) return -rv; buf = r->buf_ptrs[r->this_stream->read_next]; if (buf->data_len <= num_samples) { memcpy(out_buf, (void *)&buf->samples[r->partial_read_begin_ptr * 2], SAMPLE_SIZE_BYTE * buf->data_len); r->partial_read_begin_ptr = 0; rv = buf->data_len; buf->data_len = 0; r->this_stream->read_next = (r->this_stream->read_next + 1) & (r->this_stream->num_buffers - 1); freeflag = 1; } else /*if (buf->data_len > num_samples)*/ { memcpy(out_buf, (void *)&buf->samples[r->partial_read_begin_ptr * 2], SAMPLE_SIZE_BYTE * num_samples); r->partial_read_begin_ptr += num_samples; buf->data_len -= num_samples; rv = num_samples; } *timestamp = buf->timestamp; buf->timestamp += rv; if (freeflag) pthread_cond_signal((pthread_cond_t *)&r->this_stream->ce); pthread_mutex_unlock((pthread_mutex_t *)&r->this_stream->lock); return rv; }