diff options
author | arehbein <arehbein@sysmocom.de> | 2023-04-11 13:51:14 +0200 |
---|---|---|
committer | Daniel Willmann <dwillmann@sysmocom.de> | 2023-05-31 12:13:18 +0200 |
commit | 062a8efd2683d1ac760a5fda4fbc17468fe44a88 (patch) | |
tree | 1c4587bee301925e308263fcd6406bfb9146f343 | |
parent | a15fdd13a3ceffc52789a6ee23eff6d7a174f959 (diff) |
stream: Add server-side (segmentation) support for IPAC
With this commit, IPAC segmentation is taken care of by setting the protocol being streamed
Depends on change I3a639e6896cc3b3fc8e9b2e1a58254710efa0d3f
Related: OS#5753, OS#5751
Change-Id: I6c91ff385cb5f36ab6b6c96d0e44997995d0d24c
-rw-r--r-- | include/osmocom/netif/stream.h | 13 | ||||
-rw-r--r-- | src/stream.c | 55 | ||||
-rw-r--r-- | tests/stream/stream_test.c | 242 |
3 files changed, 292 insertions, 18 deletions
diff --git a/include/osmocom/netif/stream.h b/include/osmocom/netif/stream.h index 0e983a4..f7abb5b 100644 --- a/include/osmocom/netif/stream.h +++ b/include/osmocom/netif/stream.h @@ -21,6 +21,18 @@ /*! \brief Osmocom Stream Server Link: A server socket listening/accepting */ struct osmo_stream_srv_link; +/*! \brief Type of protocol transported by the data stream */ +enum osmo_stream_proto { + OSMO_STREAM_UNSPECIFIED = -1, + OSMO_STREAM_IPAC = 0, + /* TODO: Add protocols for which libosmo-netif should be able to handle segmentation */ + _NUM_OSMO_STREAM_PROTOS +}; + +/*! \brief Shortcut for unsetting the stream protocol (gets rid of segmentation pertaining to stream protocol) */ +#define osmo_stream_srv_link_unset_stream_proto(struct_osmo_stream_srv_link_ptr)\ + osmo_stream_srv_link_set_stream_proto(struct_osmo_stream_srv_link_ptr, OSMO_STREAM_UNSPECIFIED) + struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx); void osmo_stream_srv_link_destroy(struct osmo_stream_srv_link *link); @@ -29,6 +41,7 @@ void osmo_stream_srv_link_set_addr(struct osmo_stream_srv_link *link, const char int osmo_stream_srv_link_set_addrs(struct osmo_stream_srv_link *link, const char **addr, size_t addrcnt); void osmo_stream_srv_link_set_port(struct osmo_stream_srv_link *link, uint16_t port); void osmo_stream_srv_link_set_proto(struct osmo_stream_srv_link *link, uint16_t proto); +void osmo_stream_srv_link_set_stream_proto(struct osmo_stream_srv_link *link, enum osmo_stream_proto osp); int osmo_stream_srv_link_set_type(struct osmo_stream_srv_link *link, int type); int osmo_stream_srv_link_set_domain(struct osmo_stream_srv_link *link, int domain); void osmo_stream_srv_link_set_accept_cb(struct osmo_stream_srv_link *link, int (*accept_cb)(struct osmo_stream_srv_link *link, int fd)); diff --git a/src/stream.c b/src/stream.c index c27e1aa..1b0ccba 100644 --- a/src/stream.c +++ b/src/stream.c @@ -36,6 +36,7 @@ #include <osmocom/core/select.h> #include <osmocom/core/utils.h> #include <osmocom/gsm/tlv.h> +#include <osmocom/gsm/ipa.h> #include <osmocom/core/msgb.h> #include <osmocom/core/osmo_io.h> #include <osmocom/core/panic.h> @@ -78,6 +79,19 @@ #define MSG_NOSIGNAL 0 #endif +//enum cb_type { +// CB_TYPE_SEGM = 0, +// CB_TYPE_READ, +// CB_TYPE_WRITE, +// _NUM_CB_TYPES +//}; + +//static int (*segmentation_cbs[_NUM_OSMO_STREAM_PROTOS][_NUM_CB_TYPES])(struct msgb *, int) = { +// [OSMO_STREAM_IPAC][CB_TYPE_SEGM] = ipa_segmentation_cb, +static int (*segmentation_cbs[_NUM_OSMO_STREAM_PROTOS])(struct msgb *, int) = { + [OSMO_STREAM_IPAC] = ipa_segmentation_cb, +}; + /* is any of the bytes from offset .. u8_size in 'u8' non-zero? return offset or -1 if all zero */ static int byte_nonzero(const uint8_t *u8, unsigned int offset, unsigned int u8_size) { @@ -1189,6 +1203,7 @@ struct osmo_stream_srv_link { int sk_domain; int sk_type; uint16_t proto; + enum osmo_stream_proto stream_proto; int (*accept_cb)(struct osmo_stream_srv_link *srv, int fd); void *data; int flags; @@ -1279,22 +1294,7 @@ struct osmo_stream_srv_link *osmo_stream_srv_link_create(void *ctx) link->sk_domain = AF_UNSPEC; link->sk_type = SOCK_STREAM; link->proto = IPPROTO_TCP; - osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0); - - return link; -} - -struct osmo_stream_srv_link *osmo_stream_srv_link_create_iofd(void *ctx, const char *name) -{ - struct osmo_stream_srv_link *link; - - link = talloc_zero(ctx, struct osmo_stream_srv_link); - if (!link) - return NULL; - - link->sk_domain = AF_UNSPEC; - link->sk_type = SOCK_STREAM; - link->proto = IPPROTO_TCP; + link->stream_proto = OSMO_STREAM_UNSPECIFIED; osmo_fd_setup(&link->ofd, -1, OSMO_FD_READ | OSMO_FD_WRITE, osmo_stream_srv_ofd_cb, link, 0); return link; @@ -1430,6 +1430,26 @@ static struct osmo_io_ops srv_ioops = { .write_cb = stream_srv_iofd_write_cb, }; +/*! \brief Set the protocol transported by the stream for the stream server link + * \param[in] link Stream Server Link to modify + * \param[in] proto Protocol (like IPPROTO_TCP (default), IPPROTO_SCTP, ...). + */ +void osmo_stream_srv_link_set_stream_proto(struct osmo_stream_srv_link *link, + enum osmo_stream_proto osp) +{ + if (!(OSMO_STREAM_UNSPECIFIED <= osp && osp < _NUM_OSMO_STREAM_PROTOS)) { + LOGP(DLINP, LOGL_ERROR, "Unexpected value (%d) for variable of type " + "'enum osmo_stream_proto'\n", osp); + return; + } + link->stream_proto = osp; + if (osp != OSMO_STREAM_UNSPECIFIED) + srv_ioops.segmentation_cb = segmentation_cbs[osp]; + else + srv_ioops.segmentation_cb = NULL; + link->flags |= OSMO_STREAM_SRV_F_RECONF; +} + /*! \brief Set the socket type for the stream server link * \param[in] link Stream Server Link to modify * \param[in] type Socket Type (like SOCK_STREAM (default), SOCK_SEQPACKET, ...) @@ -1764,7 +1784,8 @@ osmo_stream_srv_create_iofd(void *ctx, const char *name, } conn->mode = OSMO_STREAM_MODE_OSMO_IO; conn->srv = link; - conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn); + conn->iofd = osmo_iofd_setup(conn, fd, name, OSMO_IO_FD_MODE_READ_WRITE, + &srv_ioops, conn); conn->iofd_read_cb = read_cb; conn->closed_cb = closed_cb; conn->data = data; diff --git a/tests/stream/stream_test.c b/tests/stream/stream_test.c index 36a222e..25995e5 100644 --- a/tests/stream/stream_test.c +++ b/tests/stream/stream_test.c @@ -8,18 +8,24 @@ * (at your option) any later version. */ +#include <inttypes.h> +#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <sys/socket.h> +#include <sys/un.h> #include <unistd.h> #include <errno.h> +#include <osmocom/core/byteswap.h> #include <osmocom/core/select.h> #include <osmocom/core/talloc.h> #include <osmocom/core/msgb.h> #include <osmocom/core/logging.h> #include <osmocom/core/application.h> #include <osmocom/core/timer.h> +#include <osmocom/gsm/protocol/ipaccess.h> #include <osmocom/netif/stream.h> @@ -363,6 +369,235 @@ static void test_recon(void *ctx, const char *host, unsigned port, unsigned step printf("{%lu.%06lu} %s test complete.\n\n", tv.tv_sec, tv.tv_usec, ASTR(autoreconnect)); } +struct ipa_head { + uint16_t len; + uint8_t proto; + uint8_t data[0]; +} __attribute__ ((packed)); + +#define IPAC_MSG_PING_LEN 0x01 +static const uint8_t ipac_msg_ping[] = { + 0x00, IPAC_MSG_PING_LEN, + IPAC_PROTO_IPACCESS, + IPAC_MSGT_PING +}; +#define IPAC_MSG_PONG_LEN 0x01 +static const uint8_t ipac_msg_pong[] = { + 0x00, IPAC_MSG_PONG_LEN, + IPAC_PROTO_IPACCESS, + IPAC_MSGT_PONG +}; +#define IPAC_MSG_ID_REQ_LEN 0x03 +static const uint8_t ipac_msg_idreq[] = { + 0x00, IPAC_MSG_PING_LEN, + IPAC_PROTO_IPACCESS, + IPAC_MSGT_ID_GET, + 0x01, IPAC_IDTAG_UNITNAME +}; +#define ipac_msg_idreq_half (sizeof (ipac_msg_idreq)/2) +#define ipac_msg_idreq_other_half (sizeof (ipac_msg_idreq) - ipac_msg_idreq_half) +#define IPAC_MSG_ID_RESP_LEN 0x07 +static const uint8_t ipac_msg_idresp[] = { + 0x00, IPAC_MSG_PING_LEN, + IPAC_PROTO_IPACCESS, + IPAC_MSGT_ID_RESP, + 0x01, IPAC_IDTAG_UNITNAME, 0xde, 0xad, 0xbe, 0xef +}; + +#define put_ipa_msg(unsigned_char_ptr, struct_msgb_ptr, byte_array) do {\ + (unsigned_char_ptr) = msgb_put(struct_msgb_ptr, sizeof (byte_array));\ + memcpy(unsigned_char_ptr, byte_array, sizeof (byte_array));\ +} while (0) + +static int test_segmentation_cli_connect_cb(struct osmo_stream_cli *cli) +{ + printf("Connect callback triggered (segmentation test)\n"); + + unsigned char *data; + void *recon = osmo_stream_cli_get_data(cli); + struct msgb *m = msgb_alloc_headroom(128, 0, "IPA messages"); + if (m == NULL) { + fprintf(stderr, "Cannot allocate message\n"); + return -ENOMEM; + } + + /* Send 4 and 1/2 messages */ + put_ipa_msg(data, m, ipac_msg_ping); + put_ipa_msg(data, m, ipac_msg_pong); + put_ipa_msg(data, m, ipac_msg_idreq); + put_ipa_msg(data, m, ipac_msg_idresp); + data = msgb_put(m, ipac_msg_idreq_half); + memcpy(data, ipac_msg_idreq, ipac_msg_idreq_half); + osmo_stream_cli_send(cli, m); + + if (recon) { + printf("Closing connection\n"); + osmo_stream_cli_close(cli); + } else + printf("Connect callback\n"); + + return 0; +} + +static int ipa_process_msg(struct msgb *msg) +{ + struct ipa_head *h = (struct ipa_head *)msg->data; + int len; + size_t ipa_msg_len = osmo_ntohs(h->len); + if (msg->len < sizeof (struct ipa_head)) { + fprintf(stderr, "IPA message too small\n"); + return -EIO; + } + len = sizeof (struct ipa_head) + ipa_msg_len; + if (len > msg->len) { + fprintf(stderr, "Bad IPA message header " + "hdrlen=%u < datalen=%u\n", + len, msg->len); + return -EIO; + } + /* msg->l2h = msg->data + sizeof (struct ipa_head); */ + return 0; +} + +/* Array indices correspond to enum values stringified on the right */ +static const char *IPAC_MSG_TYPES[] = { + [0] = "IPAC_MSGT_PING", + [1] = "IPAC_MSGT_PONG", + [2] = "UNEXPECTED VALUE", + [3] = "UNEXPECTED VALUE", + [4] = "IPAC_MSGT_ID_GET", + [5] = "IPAC_MSGT_ID_RESP", +}; + +static bool all_msgs_sent = false; + +static int test_segmentation_stream_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *m) +{ + unsigned char *data; + struct ipa_head *h = (struct ipa_head *) m->data; + int rc; + uint8_t ipa_msg_type = h->data[0]; + if ((rc = ipa_process_msg(m)) < 0) + return rc; + printf("Received message from stream (len=%" PRIu16 ")\n", msgb_length(m)); + if (ipa_msg_type < 0 || 5 < ipa_msg_type) { + fprintf(stderr, "Received message from stream (len=%" PRIu16 ")\n", + msgb_length(m)); + return -ENOMSG; + } + printf("Type: %s\n", IPAC_MSG_TYPES[ipa_msg_type]); + if (ipa_msg_type == IPAC_MSGT_ID_GET) { + printf("Got back IPAC_MSGT_ID_GET from server." + "Sending second half of IPAC_MSGT_ID_RESP\n"); + data = msgb_put(m, ipac_msg_idreq_other_half); + memcpy(data, ipac_msg_idreq + ipac_msg_idreq_other_half, + ipac_msg_idreq_other_half); + osmo_stream_cli_send(osc, m); + all_msgs_sent = true; + } else if (ipa_msg_type == IPAC_MSGT_ID_RESP) { + printf("result= %s\n", osmo_hexdump(m->data, m->len)); + printf("expected=%s\n", + osmo_hexdump(ipac_msg_idresp, sizeof(ipac_msg_idresp))); + } + return 0; +} + +static void *test_segmentation_run_client() +{ + struct osmo_stream_cli *osc; + struct timespec start, now; + int rc; + void *ctx = talloc_named_const(NULL, 0, "test_segmentation_run_client"); + + (void) msgb_talloc_ctx_init(ctx, 0); + osc = osmo_stream_cli_create_iofd(ctx, "IPA test client"); + if (osc == NULL) { + fprintf(stderr, "osmo_stream_cli_create_iofd()\n"); + return NULL; + } + osmo_stream_cli_set_addr(osc, "127.0.0.11"); + osmo_stream_cli_set_port(osc, 1111); + osmo_stream_cli_set_connect_cb(osc, test_segmentation_cli_connect_cb); + osmo_stream_cli_set_data(osc, ctx); + osmo_stream_cli_set_iofd_read_cb(osc, test_segmentation_stream_cli_read_cb); + osmo_stream_cli_set_nodelay(osc, true); + if (osmo_stream_cli_open(osc) < 0) { + fprintf(stderr, "Cannot open stream client\n"); + return NULL; + } + + rc = clock_gettime(CLOCK_MONOTONIC, &start); + if (rc < 0) { + fprintf(stderr, "clock_gettime(): %s\n", strerror(errno)); + return NULL; + } + // int tdiff_secs = 0; + // while (!all_msgs_sent && tdiff_secs < 1) { + // for (; !all_msgs_sent;); + + return NULL; // Adapt? +} + +static void test_segmentation_ipa(void *ctx, const char *host, unsigned port, + struct osmo_stream_srv_link *srv) +{ + int rc; + struct timespec start, now; + osmo_stream_srv_link_set_stream_proto(srv, OSMO_STREAM_IPAC); + osmo_stream_srv_link_set_data(srv, ctx); + pthread_t pt; + test_segmentation_run_client(); + // rc = pthread_create(&pt, NULL, test_segmentation_run_client, (void *)srv); + // if (rc != 0) { + // fprintf(stderr, "pthread_create(): %s\n", strerror(errno)); + // return; + // } + + rc = clock_gettime(CLOCK_MONOTONIC, &start); + if (rc < 0) { + fprintf(stderr, "clock_gettime(): %s\n", strerror(errno)); + return; + } + int tdiff_secs = 0; + // while (!all_msgs_sent && tdiff_secs < 1) { + while (!all_msgs_sent) { + osmo_gettimeofday_override_add(0, 1); /* small increment to easily spot iterations */ + osmo_select_main(0); + rc = clock_gettime(CLOCK_MONOTONIC, &now); + if (rc < 0) { + fprintf(stderr, "clock_gettime(): %s\n", strerror(errno)); + return; + } + tdiff_secs = now.tv_sec - start.tv_sec; + } + + osmo_stream_srv_link_unset_stream_proto(srv); + return; +} + +int test_segmentation_stream_srv_read_cb(struct osmo_stream_srv *conn, struct msgb *msg) +{ + LOGP(DSTREAMTEST, LOGL_DEBUG, "received message from stream (len=%d)\n", msgb_length(msg)); + ipa_process_msg(msg); + osmo_stream_srv_send(conn, msg); + return 0; +} + + +static int test_segmentation_stream_srv_accept_cb(struct osmo_stream_srv_link *srv, int fd) +{ + void *ctx = talloc_named_const(NULL, 0, "test_segmentation_stream_srv_accept_cb"); + struct osmo_stream_srv *oss = + osmo_stream_srv_create_iofd(ctx, "srv link", srv, fd, + test_segmentation_stream_srv_read_cb, + close_cb_srv, NULL); + if (oss == NULL) { + fprintf(stderr, "Error while creating connection\n"); + return -1; + } + + return 0; +} int main(void) { @@ -401,8 +636,13 @@ int main(void) test_recon(tall_test, host, port, 12, srv, true); test_recon(tall_test, host, port, 8, srv, false); - osmo_stream_srv_link_destroy(srv); + + osmo_stream_srv_link_set_accept_cb(srv, + test_segmentation_stream_srv_accept_cb); + test_segmentation_ipa(tall_test, host, port, srv); + + printf("Stream tests completed\n"); return EXIT_SUCCESS; |