aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2023-11-18 20:11:47 +0100
committerAndreas Eversberg <jolly@eversberg.eu>2024-02-29 13:23:13 +0100
commit7e6d2e0f99ff095f4714f03b1ed991d6c9cb9c61 (patch)
tree4a0c0c8bcf430489ec778cf499c825a080af3a35
parent54d17d664aad318b1f468f6fc15c629fa7d81ea4 (diff)
stream_{cli,srv}: Add support for SCTP in OSMO_IO mode
Let's enable the OSMO_IO_FD_MODE_RECVMSG_SENDMSG mode for SCTP sockets, allowing OSMO_STREAM_MODE_OSMO_IO to be used with SCTP. Change-Id: I6cf5bad5f618e71c80017960c38009b089dbd6a1 Depends: libosmocore Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82 Closes: OS#5753
-rw-r--r--TODO-RELEASE4
-rw-r--r--include/osmocom/netif/stream_private.h6
-rw-r--r--src/stream.c86
-rw-r--r--src/stream_cli.c69
-rw-r--r--src/stream_srv.c60
5 files changed, 198 insertions, 27 deletions
diff --git a/TODO-RELEASE b/TODO-RELEASE
index 492c205..1d860cb 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -9,4 +9,6 @@
#library what description / commit summary line
libosmocore >1.9.0 working osmo_sock_init2_multiaddr2() without setting flag OSMO_SOCK_F_BIND
libosmocore >1.9.0 use osmo_sock_multiaddr_get_name_buf()
-libosmo-netif added osmo_stream_srv_get_sockname() \ No newline at end of file
+libosmocore >1.9.0 use OSMO_IO_FD_MODE_RECVMSG_SENDMSG
+libosmo-netif added osmo_stream_srv_get_sockname()
+libosmo-netif update-dependency libosmocore > 1.9.0 required for I89eb519b22d21011d61a7855b2364bc3c295df82
diff --git a/include/osmocom/netif/stream_private.h b/include/osmocom/netif/stream_private.h
index c593527..106b017 100644
--- a/include/osmocom/netif/stream_private.h
+++ b/include/osmocom/netif/stream_private.h
@@ -31,8 +31,14 @@ enum osmo_stream_mode {
OSMO_STREAM_MODE_OSMO_IO,
};
+struct osmo_io_fd;
+struct msghdr;
+
int stream_sctp_sock_activate_events(int fd);
int stream_setsockopt_nodelay(int fd, int proto, int on);
int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx);
+int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags);
+int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh);
+
/*! @} */
diff --git a/src/stream.c b/src/stream.c
index 26e745c..f4755e0 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -209,25 +209,19 @@ int stream_setsockopt_nodelay(int fd, int proto, int on)
}
#ifdef HAVE_LIBSCTP
-#define LOGPFX(pfx, level, fmt, args...) \
- LOGP(DLINP, level, "%s " fmt, pfx, ## args)
-int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx)
+static int stream_sctp_recvmsg_trailer(const char *log_pfx, struct msgb *msg, int ret, const struct sctp_sndrcvinfo *sinfo, int flags)
{
- struct sctp_sndrcvinfo sinfo;
- int flags = 0;
- int ret;
- uint8_t *data = msg->tail;
-
- ret = sctp_recvmsg(fd, data, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags);
msgb_sctp_msg_flags(msg) = 0;
- msgb_sctp_ppid(msg) = ntohl(sinfo.sinfo_ppid);
- msgb_sctp_stream(msg) = sinfo.sinfo_stream;
+ if (OSMO_LIKELY(sinfo)) {
+ msgb_sctp_ppid(msg) = ntohl(sinfo->sinfo_ppid);
+ msgb_sctp_stream(msg) = sinfo->sinfo_stream;
+ }
if (flags & MSG_NOTIFICATION) {
char buf[512];
struct osmo_strbuf sb = { .buf = buf, .len = sizeof(buf) };
int logl = LOGL_INFO;
- union sctp_notification *notif = (union sctp_notification *)data;
+ union sctp_notification *notif = (union sctp_notification *) msg->data;
OSMO_STRBUF_PRINTF(sb, "%s NOTIFICATION %s flags=0x%x", log_pfx,
osmo_sctp_sn_type_str(notif->sn_header.sn_type), notif->sn_header.sn_flags);
@@ -283,8 +277,76 @@ int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx)
LOGP(DLINP, logl, "%s\n", buf);
return ret;
}
+
+ if (OSMO_UNLIKELY(ret > 0 && !sinfo))
+ LOGP(DLINP, LOGL_ERROR, "%s sctp_recvmsg without SNDRCV cmsg?!?\n", log_pfx);
+
return ret;
}
+
+/*! wrapper for regular synchronous sctp_recvmsg(3) */
+int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx)
+{
+ struct sctp_sndrcvinfo sinfo;
+ int flags = 0;
+ int ret;
+
+ ret = sctp_recvmsg(fd, msg->tail, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags);
+ return stream_sctp_recvmsg_trailer(log_pfx, msg, ret, &sinfo, flags);
+}
+
+/*! wrapper for osmo_io asynchronous recvmsg response */
+int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh)
+{
+ const struct sctp_sndrcvinfo *sinfo = NULL;
+ struct cmsghdr *cmsg = NULL;
+
+ for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msgh); cmsg != NULL;
+ cmsg = CMSG_NXTHDR((struct msghdr *) msgh, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) {
+ sinfo = (const struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ break;
+ }
+ }
+
+ return stream_sctp_recvmsg_trailer(osmo_iofd_get_name(iofd), msg, ret, sinfo, msgh->msg_flags);
+}
+
+/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg().
+ *
+ * Appends the message to the internal transmit queue.
+ * If the function returns success (0), it will take ownership of the msgb and
+ * internally call msgb_free() after the write request completes.
+ * In case of an error the msgb needs to be freed by the caller.
+ *
+ * \param[in] iofd file descriptor to write to
+ * \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream
+ * \param[in] sendmsg_flags Flags to pass to the send call
+ * \returns 0 in case of success; a negative value in case of error
+ */
+int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags)
+{
+ struct msghdr outmsg = {};
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ struct sctp_sndrcvinfo *sinfo;
+ struct cmsghdr *cmsg;
+
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = sizeof(outcmsg);
+
+ cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ outmsg.msg_controllen = cmsg->cmsg_len;
+ sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg));
+ sinfo->sinfo_stream = msgb_sctp_stream(msg);
+
+ return osmo_iofd_sendmsg_msgb(iofd, msg, sendmsg_flags, &outmsg);
+}
#endif
diff --git a/src/stream_cli.c b/src/stream_cli.c
index 1b9994c..cf639bc 100644
--- a/src/stream_cli.c
+++ b/src/stream_cli.c
@@ -470,13 +470,45 @@ static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct ms
}
}
-static struct osmo_io_ops osmo_stream_cli_ioops = {
+static const struct osmo_io_ops osmo_stream_cli_ioops = {
.read_cb = stream_cli_iofd_read_cb,
.write_cb = stream_cli_iofd_write_cb,
.segmentation_cb = NULL,
};
+#ifdef HAVE_LIBSCTP
+static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh)
+{
+ struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd);
+
+ res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
+
+ switch (cli->state) {
+ case STREAM_CLI_STATE_CONNECTING:
+ stream_cli_handle_connecting(cli, res);
+ break;
+ case STREAM_CLI_STATE_CONNECTED:
+ if (res == 0)
+ osmo_stream_cli_reconnect(cli);
+ /* Forward message to read callback, also if the connection failed. */
+ if (cli->iofd_read_cb)
+ cli->iofd_read_cb(cli, msg);
+ break;
+ default:
+ osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state);
+ }
+}
+
+static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = {
+ .recvmsg_cb = stream_cli_iofd_recvmsg_cb,
+ .sendmsg_cb = stream_cli_iofd_write_cb,
+
+ .segmentation_cb = NULL,
+};
+#endif
+
+
/*! \brief Set a name on the cli object (used during logging)
* \param[in] cli stream_cli whose name is to be set
* \param[in] name the name to be set on cli
@@ -592,14 +624,18 @@ osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto)
}
/* Configure client side segmentation for the iofd */
-static void configure_cli_segmentation_cb(struct osmo_io_fd *iofd,
+static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli,
int (*segmentation_cb)(struct msgb *msg))
{
/* Copy default settings */
- struct osmo_io_ops client_ops = osmo_stream_cli_ioops;
+ struct osmo_io_ops client_ops;
+ if (cli->proto == IPPROTO_SCTP)
+ client_ops = osmo_stream_cli_ioops_sctp;
+ else
+ client_ops = osmo_stream_cli_ioops;
/* Set segmentation cb for this client */
client_ops.segmentation_cb = segmentation_cb;
- osmo_iofd_set_ioops(iofd, &client_ops);
+ osmo_iofd_set_ioops(cli->iofd, &client_ops);
}
/*! \brief Set the segmentation callback for the client
@@ -611,7 +647,7 @@ void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli,
{
cli->segmentation_cb = segmentation_cb;
if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */
- configure_cli_segmentation_cb(cli->iofd, segmentation_cb);
+ configure_cli_segmentation_cb(cli, segmentation_cb);
}
/*! \brief Set the socket type for the stream server link
@@ -904,11 +940,23 @@ int osmo_stream_cli_open(struct osmo_stream_cli *cli)
goto error_close_socket;
break;
case OSMO_STREAM_MODE_OSMO_IO:
- if (!cli->iofd)
- cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, &osmo_stream_cli_ioops, cli);
+#ifdef HAVE_LIBSCTP
+ if (cli->proto == IPPROTO_SCTP) {
+ cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
+ &osmo_stream_cli_ioops_sctp, cli);
+ if (cli->iofd)
+ osmo_iofd_set_cmsg_size(cli->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo)));
+ } else {
+#else
+ if (true) {
+#endif
+ cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE,
+ &osmo_stream_cli_ioops, cli);
+ }
if (!cli->iofd)
goto error_close_socket;
- configure_cli_segmentation_cb(cli->iofd, cli->segmentation_cb);
+
+ configure_cli_segmentation_cb(cli, cli->segmentation_cb);
if (osmo_iofd_register(cli->iofd, fd) < 0)
goto error_close_socket;
@@ -951,7 +999,10 @@ void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg)
osmo_fd_write_enable(&cli->ofd);
break;
case OSMO_STREAM_MODE_OSMO_IO:
- osmo_iofd_write_msgb(cli->iofd, msg);
+ if (cli->proto == IPPROTO_SCTP)
+ stream_iofd_sctp_send_msgb(cli->iofd, msg, MSG_NOSIGNAL);
+ else
+ osmo_iofd_write_msgb(cli->iofd, msg);
break;
default:
OSMO_ASSERT(false);
diff --git a/src/stream_srv.c b/src/stream_srv.c
index dbf8aed..041a4ef 100644
--- a/src/stream_srv.c
+++ b/src/stream_srv.c
@@ -592,10 +592,46 @@ static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct ms
osmo_stream_srv_destroy(conn);
}
-static struct osmo_io_ops srv_ioops = {
+static const struct osmo_io_ops srv_ioops = {
.read_cb = stream_srv_iofd_read_cb,
.write_cb = stream_srv_iofd_write_cb,
};
+
+#ifdef HAVE_LIBSCTP
+static void stream_srv_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh)
+{
+ struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd);
+ LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res);
+
+ res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh);
+ if (res == -EAGAIN)
+ return;
+
+ if (OSMO_UNLIKELY(res <= 0)) {
+ /* This connection is dead, destroy it. */
+ osmo_stream_srv_destroy(conn);
+ } else {
+ if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) {
+ LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n");
+ msgb_free(msg);
+ if (osmo_iofd_txqueue_len(iofd) == 0)
+ osmo_stream_srv_destroy(conn);
+ return;
+ }
+
+ if (conn->iofd_read_cb)
+ conn->iofd_read_cb(conn, msg);
+ else
+ msgb_free(msg);
+ }
+}
+
+static const struct osmo_io_ops srv_ioops_sctp = {
+ .recvmsg_cb = stream_srv_iofd_recvmsg_cb,
+ .sendmsg_cb = stream_srv_iofd_write_cb,
+};
+#endif
+
static int osmo_stream_srv_read(struct osmo_stream_srv *conn)
{
int rc = 0;
@@ -764,8 +800,15 @@ osmo_stream_srv_create2(void *ctx, struct osmo_stream_srv_link *link, int fd, vo
osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd);
- conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname,
- OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn);
+ if (link->proto == IPPROTO_SCTP) {
+ conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
+ &srv_ioops_sctp, conn);
+ if (conn->iofd)
+ osmo_iofd_set_cmsg_size(conn->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo)));
+ } else {
+ conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_READ_WRITE,
+ &srv_ioops, conn);
+ }
if (!conn->iofd) {
talloc_free(conn);
return NULL;
@@ -842,7 +885,11 @@ void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn,
* osmo_stream_srv_create2() creates the iofd member, too */
OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO);
/* Copy default settings */
- struct osmo_io_ops conn_ops = srv_ioops;
+ struct osmo_io_ops conn_ops;
+ if (conn->srv->proto == IPPROTO_SCTP)
+ conn_ops = srv_ioops_sctp;
+ else
+ conn_ops = srv_ioops;
/* Set segmentation cb for this connection */
conn_ops.segmentation_cb = segmentation_cb;
osmo_iofd_set_ioops(conn->iofd, &conn_ops);
@@ -951,7 +998,10 @@ void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg)
osmo_fd_write_enable(&conn->ofd);
break;
case OSMO_STREAM_MODE_OSMO_IO:
- osmo_iofd_write_msgb(conn->iofd, msg);
+ if (conn->srv->proto == IPPROTO_SCTP)
+ stream_iofd_sctp_send_msgb(conn->iofd, msg, MSG_NOSIGNAL);
+ else
+ osmo_iofd_write_msgb(conn->iofd, msg);
break;
default:
OSMO_ASSERT(false);