aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarehbein <arehbein@sysmocom.de>2023-04-11 13:51:14 +0200
committerDaniel Willmann <dwillmann@sysmocom.de>2023-05-31 12:13:18 +0200
commit062a8efd2683d1ac760a5fda4fbc17468fe44a88 (patch)
tree1c4587bee301925e308263fcd6406bfb9146f343
parenta15fdd13a3ceffc52789a6ee23eff6d7a174f959 (diff)
stream: Add server-side (segmentation) support for IPAC
With this commit, IPAC segmentation is taken care of by setting the protocol being streamed Depends on change I3a639e6896cc3b3fc8e9b2e1a58254710efa0d3f Related: OS#5753, OS#5751 Change-Id: I6c91ff385cb5f36ab6b6c96d0e44997995d0d24c
-rw-r--r--include/osmocom/netif/stream.h13
-rw-r--r--src/stream.c55
-rw-r--r--tests/stream/stream_test.c242
3 files changed, 292 insertions, 18 deletions
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h
index 0e983a4..f7abb5b 100644
--- a/include/osmocom/netif/stream.h
+++ b/include/osmocom/netif/stream.h
@@ -21,6 +21,18 @@
/*! \brief Osmocom Stream Server Link: A server socket listening/accepting */
struct osmo_stream_srv_link;
+/*! \brief Type of protocol transported by the data stream */
+enum osmo_stream_proto {
+ OSMO_STREAM_UNSPECIFIED = -1,
+ OSMO_STREAM_IPAC = 0,
+ /* TODO: Add protocols for which libosmo-netif should be able to handle segmentation */
+ _NUM_OSMO_STREAM_PROTOS
+};
+
+/*! \brief Shortcut for unsetting the stream protocol (gets rid of segmentation pertaining to stream protocol) */
+#define osmo_stream_srv_link_unset_stream_proto(struct_osmo_stream_srv_link_ptr)\
+ osmo_stream_srv_link_set_stream_proto(struct_osmo_stream_srv_link_ptr, OSMO_STREAM_UNSPECIFIED)
+
struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx);
void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link);
@@ -29,6 +41,7 @@ void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link, const char
int osmo_stream_srv_link_set_addrs(struct osmo_stream_srv_link *link, const char **addr, size_t addrcnt);
void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link, uint16_t port);
void osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link, uint16_t proto);
+void osmo_stream_srv_link_set_stream_proto(struct osmo_stream_srv_link *link, enum osmo_stream_proto osp);
int osmo_stream_srv_link_set_type(struct osmo_stream_srv_link *link, int type);
int osmo_stream_srv_link_set_domain(struct osmo_stream_srv_link *link, int domain);
void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, int (*accept_cb)(struct osmo_stream_srv_link *link, int fd));
diff --git a/src/stream.c b/src/stream.c
index c27e1aa..1b0ccba 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -36,6 +36,7 @@
#include <osmocom/core/select.h>
#include <osmocom/core/utils.h>
#include <osmocom/gsm/tlv.h>
+#include <osmocom/gsm/ipa.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/panic.h>
@@ -78,6 +79,19 @@
#define MSG_NOSIGNAL 0
#endif
+//enum cb_type {
+// CB_TYPE_SEGM = 0,
+// CB_TYPE_READ,
+// CB_TYPE_WRITE,
+// _NUM_CB_TYPES
+//};
+
+//static int (*segmentation_cbs[_NUM_OSMO_STREAM_PROTOS][_NUM_CB_TYPES])(struct msgb *, int) = {
+// [OSMO_STREAM_IPAC][CB_TYPE_SEGM] = ipa_segmentation_cb,
+static int (*segmentation_cbs[_NUM_OSMO_STREAM_PROTOS])(struct msgb *, int) = {
+ [OSMO_STREAM_IPAC] = ipa_segmentation_cb,
+};
+
/* is any of the bytes from offset .. u8_size in 'u8' non-zero? return offset or -1 if all zero */
static int byte_nonzero(const uint8_t *u8, unsigned int offset, unsigned int u8_size)
{
@@ -1189,6 +1203,7 @@ struct osmo_stream_srv_link {
int sk_domain;
int sk_type;
uint16_t proto;
+ enum osmo_stream_proto stream_proto;
int (*accept_cb)(struct osmo_stream_srv_link *srv, int fd);
void *data;
int flags;
@@ -1279,22 +1294,7 @@ struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx)
link->sk_domain = AF_UNSPEC;
link->sk_type = SOCK_STREAM;
link->proto = IPPROTO_TCP;
- osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0);
-
- return link;
-}
-
-struct osmo_stream_srv_link *osmo_stream_srv_link_create_iofd(void *ctx, const char *name)
-{
- struct osmo_stream_srv_link *link;
-
- link = talloc_zero(ctx, struct osmo_stream_srv_link);
- if (!link)
- return NULL;
-
- link->sk_domain = AF_UNSPEC;
- link->sk_type = SOCK_STREAM;
- link->proto = IPPROTO_TCP;
+ link->stream_proto = OSMO_STREAM_UNSPECIFIED;
osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0);
return link;
@@ -1430,6 +1430,26 @@ static struct osmo_io_ops srv_ioops = {
.write_cb = stream_srv_iofd_write_cb,
};
+/*! \brief Set the protocol transported by the stream for the stream server link
+ * \param[in] link Stream Server Link to modify
+ * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...).
+ */
+void osmo_stream_srv_link_set_stream_proto(struct osmo_stream_srv_link *link,
+ enum osmo_stream_proto osp)
+{
+ if (!(OSMO_STREAM_UNSPECIFIED <= osp && osp < _NUM_OSMO_STREAM_PROTOS)) {
+ LOGP(DLINP, LOGL_ERROR, "Unexpected value (%d) for variable of type "
+ "'enum osmo_stream_proto'\n", osp);
+ return;
+ }
+ link->stream_proto = osp;
+ if (osp != OSMO_STREAM_UNSPECIFIED)
+ srv_ioops.segmentation_cb = segmentation_cbs[osp];
+ else
+ srv_ioops.segmentation_cb = NULL;
+ link->flags |= OSMO_STREAM_SRV_F_RECONF;
+}
+
/*! \brief Set the socket type for the stream server link
* \param[in] link Stream Server Link to modify
* \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...)
@@ -1764,7 +1784,8 @@ osmo_stream_srv_create_iofd(void *ctx, const char *name,
}
conn->mode = OSMO_STREAM_MODE_OSMO_IO;
conn->srv = link;
- conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn);
+ conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE,
+ &srv_ioops, conn);
conn->iofd_read_cb = read_cb;
conn->closed_cb = closed_cb;
conn->data = data;
diff --git a/tests/stream/stream_test.c b/tests/stream/stream_test.c
index 36a222e..25995e5 100644
--- a/tests/stream/stream_test.c
+++ b/tests/stream/stream_test.c
@@ -8,18 +8,24 @@
* (at your option) any later version.
*/
+#include <inttypes.h>
+#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include <unistd.h>
#include <errno.h>
+#include <osmocom/core/byteswap.h>
#include <osmocom/core/select.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/application.h>
#include <osmocom/core/timer.h>
+#include <osmocom/gsm/protocol/ipaccess.h>
#include <osmocom/netif/stream.h>
@@ -363,6 +369,235 @@ static void test_recon(void *ctx, const char *host, unsigned port, unsigned step
printf("{%lu.%06lu} %s test complete.\n\n", tv.tv_sec, tv.tv_usec, ASTR(autoreconnect));
}
+struct ipa_head {
+ uint16_t len;
+ uint8_t proto;
+ uint8_t data[0];
+} __attribute__ ((packed));
+
+#define IPAC_MSG_PING_LEN 0x01
+static const uint8_t ipac_msg_ping[] = {
+ 0x00, IPAC_MSG_PING_LEN,
+ IPAC_PROTO_IPACCESS,
+ IPAC_MSGT_PING
+};
+#define IPAC_MSG_PONG_LEN 0x01
+static const uint8_t ipac_msg_pong[] = {
+ 0x00, IPAC_MSG_PONG_LEN,
+ IPAC_PROTO_IPACCESS,
+ IPAC_MSGT_PONG
+};
+#define IPAC_MSG_ID_REQ_LEN 0x03
+static const uint8_t ipac_msg_idreq[] = {
+ 0x00, IPAC_MSG_PING_LEN,
+ IPAC_PROTO_IPACCESS,
+ IPAC_MSGT_ID_GET,
+ 0x01, IPAC_IDTAG_UNITNAME
+};
+#define ipac_msg_idreq_half (sizeof (ipac_msg_idreq)/2)
+#define ipac_msg_idreq_other_half (sizeof (ipac_msg_idreq) - ipac_msg_idreq_half)
+#define IPAC_MSG_ID_RESP_LEN 0x07
+static const uint8_t ipac_msg_idresp[] = {
+ 0x00, IPAC_MSG_PING_LEN,
+ IPAC_PROTO_IPACCESS,
+ IPAC_MSGT_ID_RESP,
+ 0x01, IPAC_IDTAG_UNITNAME, 0xde, 0xad, 0xbe, 0xef
+};
+
+#define put_ipa_msg(unsigned_char_ptr, struct_msgb_ptr, byte_array) do {\
+ (unsigned_char_ptr) = msgb_put(struct_msgb_ptr, sizeof (byte_array));\
+ memcpy(unsigned_char_ptr, byte_array, sizeof (byte_array));\
+} while (0)
+
+static int test_segmentation_cli_connect_cb(struct osmo_stream_cli *cli)
+{
+ printf("Connect callback triggered (segmentation test)\n");
+
+ unsigned char *data;
+ void *recon = osmo_stream_cli_get_data(cli);
+ struct msgb *m = msgb_alloc_headroom(128, 0, "IPA messages");
+ if (m == NULL) {
+ fprintf(stderr, "Cannot allocate message\n");
+ return -ENOMEM;
+ }
+
+ /* Send 4 and 1/2 messages */
+ put_ipa_msg(data, m, ipac_msg_ping);
+ put_ipa_msg(data, m, ipac_msg_pong);
+ put_ipa_msg(data, m, ipac_msg_idreq);
+ put_ipa_msg(data, m, ipac_msg_idresp);
+ data = msgb_put(m, ipac_msg_idreq_half);
+ memcpy(data, ipac_msg_idreq, ipac_msg_idreq_half);
+ osmo_stream_cli_send(cli, m);
+
+ if (recon) {
+ printf("Closing connection\n");
+ osmo_stream_cli_close(cli);
+ } else
+ printf("Connect callback\n");
+
+ return 0;
+}
+
+static int ipa_process_msg(struct msgb *msg)
+{
+ struct ipa_head *h = (struct ipa_head *)msg->data;
+ int len;
+ size_t ipa_msg_len = osmo_ntohs(h->len);
+ if (msg->len < sizeof (struct ipa_head)) {
+ fprintf(stderr, "IPA message too small\n");
+ return -EIO;
+ }
+ len = sizeof (struct ipa_head) + ipa_msg_len;
+ if (len > msg->len) {
+ fprintf(stderr, "Bad IPA message header "
+ "hdrlen=%u < datalen=%u\n",
+ len, msg->len);
+ return -EIO;
+ }
+ /* msg->l2h = msg->data + sizeof (struct ipa_head); */
+ return 0;
+}
+
+/* Array indices correspond to enum values stringified on the right */
+static const char *IPAC_MSG_TYPES[] = {
+ [0] = "IPAC_MSGT_PING",
+ [1] = "IPAC_MSGT_PONG",
+ [2] = "UNEXPECTED VALUE",
+ [3] = "UNEXPECTED VALUE",
+ [4] = "IPAC_MSGT_ID_GET",
+ [5] = "IPAC_MSGT_ID_RESP",
+};
+
+static bool all_msgs_sent = false;
+
+static int test_segmentation_stream_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *m)
+{
+ unsigned char *data;
+ struct ipa_head *h = (struct ipa_head *) m->data;
+ int rc;
+ uint8_t ipa_msg_type = h->data[0];
+ if ((rc = ipa_process_msg(m)) < 0)
+ return rc;
+ printf("Received message from stream (len=%" PRIu16 ")\n", msgb_length(m));
+ if (ipa_msg_type < 0 || 5 < ipa_msg_type) {
+ fprintf(stderr, "Received message from stream (len=%" PRIu16 ")\n",
+ msgb_length(m));
+ return -ENOMSG;
+ }
+ printf("Type: %s\n", IPAC_MSG_TYPES[ipa_msg_type]);
+ if (ipa_msg_type == IPAC_MSGT_ID_GET) {
+ printf("Got back IPAC_MSGT_ID_GET from server."
+ "Sending second half of IPAC_MSGT_ID_RESP\n");
+ data = msgb_put(m, ipac_msg_idreq_other_half);
+ memcpy(data, ipac_msg_idreq + ipac_msg_idreq_other_half,
+ ipac_msg_idreq_other_half);
+ osmo_stream_cli_send(osc, m);
+ all_msgs_sent = true;
+ } else if (ipa_msg_type == IPAC_MSGT_ID_RESP) {
+ printf("result= %s\n", osmo_hexdump(m->data, m->len));
+ printf("expected=%s\n",
+ osmo_hexdump(ipac_msg_idresp, sizeof(ipac_msg_idresp)));
+ }
+ return 0;
+}
+
+static void *test_segmentation_run_client()
+{
+ struct osmo_stream_cli *osc;
+ struct timespec start, now;
+ int rc;
+ void *ctx = talloc_named_const(NULL, 0, "test_segmentation_run_client");
+
+ (void) msgb_talloc_ctx_init(ctx, 0);
+ osc = osmo_stream_cli_create_iofd(ctx, "IPA test client");
+ if (osc == NULL) {
+ fprintf(stderr, "osmo_stream_cli_create_iofd()\n");
+ return NULL;
+ }
+ osmo_stream_cli_set_addr(osc, "127.0.0.11");
+ osmo_stream_cli_set_port(osc, 1111);
+ osmo_stream_cli_set_connect_cb(osc, test_segmentation_cli_connect_cb);
+ osmo_stream_cli_set_data(osc, ctx);
+ osmo_stream_cli_set_iofd_read_cb(osc, test_segmentation_stream_cli_read_cb);
+ osmo_stream_cli_set_nodelay(osc, true);
+ if (osmo_stream_cli_open(osc) < 0) {
+ fprintf(stderr, "Cannot open stream client\n");
+ return NULL;
+ }
+
+ rc = clock_gettime(CLOCK_MONOTONIC, &start);
+ if (rc < 0) {
+ fprintf(stderr, "clock_gettime(): %s\n", strerror(errno));
+ return NULL;
+ }
+ // int tdiff_secs = 0;
+ // while (!all_msgs_sent && tdiff_secs < 1) {
+ // for (; !all_msgs_sent;);
+
+ return NULL; // Adapt?
+}
+
+static void test_segmentation_ipa(void *ctx, const char *host, unsigned port,
+ struct osmo_stream_srv_link *srv)
+{
+ int rc;
+ struct timespec start, now;
+ osmo_stream_srv_link_set_stream_proto(srv, OSMO_STREAM_IPAC);
+ osmo_stream_srv_link_set_data(srv, ctx);
+ pthread_t pt;
+ test_segmentation_run_client();
+ // rc = pthread_create(&pt, NULL, test_segmentation_run_client, (void *)srv);
+ // if (rc != 0) {
+ // fprintf(stderr, "pthread_create(): %s\n", strerror(errno));
+ // return;
+ // }
+
+ rc = clock_gettime(CLOCK_MONOTONIC, &start);
+ if (rc < 0) {
+ fprintf(stderr, "clock_gettime(): %s\n", strerror(errno));
+ return;
+ }
+ int tdiff_secs = 0;
+ // while (!all_msgs_sent && tdiff_secs < 1) {
+ while (!all_msgs_sent) {
+ osmo_gettimeofday_override_add(0, 1); /* small increment to easily spot iterations */
+ osmo_select_main(0);
+ rc = clock_gettime(CLOCK_MONOTONIC, &now);
+ if (rc < 0) {
+ fprintf(stderr, "clock_gettime(): %s\n", strerror(errno));
+ return;
+ }
+ tdiff_secs = now.tv_sec - start.tv_sec;
+ }
+
+ osmo_stream_srv_link_unset_stream_proto(srv);
+ return;
+}
+
+int test_segmentation_stream_srv_read_cb(struct osmo_stream_srv *conn, struct msgb *msg)
+{
+ LOGP(DSTREAMTEST, LOGL_DEBUG, "received message from stream (len=%d)\n", msgb_length(msg));
+ ipa_process_msg(msg);
+ osmo_stream_srv_send(conn, msg);
+ return 0;
+}
+
+
+static int test_segmentation_stream_srv_accept_cb(struct osmo_stream_srv_link *srv, int fd)
+{
+ void *ctx = talloc_named_const(NULL, 0, "test_segmentation_stream_srv_accept_cb");
+ struct osmo_stream_srv *oss =
+ osmo_stream_srv_create_iofd(ctx, "srv link", srv, fd,
+ test_segmentation_stream_srv_read_cb,
+ close_cb_srv, NULL);
+ if (oss == NULL) {
+ fprintf(stderr, "Error while creating connection\n");
+ return -1;
+ }
+
+ return 0;
+}
int main(void)
{
@@ -401,8 +636,13 @@ int main(void)
test_recon(tall_test, host, port, 12, srv, true);
test_recon(tall_test, host, port, 8, srv, false);
-
osmo_stream_srv_link_destroy(srv);
+
+ osmo_stream_srv_link_set_accept_cb(srv,
+ test_segmentation_stream_srv_accept_cb);
+ test_segmentation_ipa(tall_test, host, port, srv);
+
+
printf("Stream tests completed\n");
return EXIT_SUCCESS;