diff options
author | Harald Welte <laforge@osmocom.org> | 2023-11-18 20:11:47 +0100 |
---|---|---|
committer | Andreas Eversberg <jolly@eversberg.eu> | 2024-02-29 13:23:13 +0100 |
commit | 7e6d2e0f99ff095f4714f03b1ed991d6c9cb9c61 (patch) | |
tree | 4a0c0c8bcf430489ec778cf499c825a080af3a35 | |
parent | 54d17d664aad318b1f468f6fc15c629fa7d81ea4 (diff) |
stream_{cli,srv}: Add support for SCTP in OSMO_IO mode
Let's enable the OSMO_IO_FD_MODE_RECVMSG_SENDMSG mode for SCTP
sockets, allowing OSMO_STREAM_MODE_OSMO_IO to be used with SCTP.
Change-Id: I6cf5bad5f618e71c80017960c38009b089dbd6a1
Depends: libosmocore Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Closes: OS#5753
-rw-r--r-- | TODO-RELEASE | 4 | ||||
-rw-r--r-- | include/osmocom/netif/stream_private.h | 6 | ||||
-rw-r--r-- | src/stream.c | 86 | ||||
-rw-r--r-- | src/stream_cli.c | 69 | ||||
-rw-r--r-- | src/stream_srv.c | 60 |
5 files changed, 198 insertions, 27 deletions
diff --git a/TODO-RELEASE b/TODO-RELEASE index 492c205..1d860cb 100644 --- a/TODO-RELEASE +++ b/TODO-RELEASE @@ -9,4 +9,6 @@ #library what description / commit summary line libosmocore >1.9.0 working osmo_sock_init2_multiaddr2() without setting flag OSMO_SOCK_F_BIND libosmocore >1.9.0 use osmo_sock_multiaddr_get_name_buf() -libosmo-netif added osmo_stream_srv_get_sockname()
\ No newline at end of file +libosmocore >1.9.0 use OSMO_IO_FD_MODE_RECVMSG_SENDMSG +libosmo-netif added osmo_stream_srv_get_sockname() +libosmo-netif update-dependency libosmocore > 1.9.0 required for I89eb519b22d21011d61a7855b2364bc3c295df82 diff --git a/include/osmocom/netif/stream_private.h b/include/osmocom/netif/stream_private.h index c593527..106b017 100644 --- a/include/osmocom/netif/stream_private.h +++ b/include/osmocom/netif/stream_private.h @@ -31,8 +31,14 @@ enum osmo_stream_mode { OSMO_STREAM_MODE_OSMO_IO, }; +struct osmo_io_fd; +struct msghdr; + int stream_sctp_sock_activate_events(int fd); int stream_setsockopt_nodelay(int fd, int proto, int on); int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx); +int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags); +int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh); + /*! @} */ diff --git a/src/stream.c b/src/stream.c index 26e745c..f4755e0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -209,25 +209,19 @@ int stream_setsockopt_nodelay(int fd, int proto, int on) } #ifdef HAVE_LIBSCTP -#define LOGPFX(pfx, level, fmt, args...) \ - LOGP(DLINP, level, "%s " fmt, pfx, ## args) -int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx) +static int stream_sctp_recvmsg_trailer(const char *log_pfx, struct msgb *msg, int ret, const struct sctp_sndrcvinfo *sinfo, int flags) { - struct sctp_sndrcvinfo sinfo; - int flags = 0; - int ret; - uint8_t *data = msg->tail; - - ret = sctp_recvmsg(fd, data, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags); msgb_sctp_msg_flags(msg) = 0; - msgb_sctp_ppid(msg) = ntohl(sinfo.sinfo_ppid); - msgb_sctp_stream(msg) = sinfo.sinfo_stream; + if (OSMO_LIKELY(sinfo)) { + msgb_sctp_ppid(msg) = ntohl(sinfo->sinfo_ppid); + msgb_sctp_stream(msg) = sinfo->sinfo_stream; + } if (flags & MSG_NOTIFICATION) { char buf[512]; struct osmo_strbuf sb = { .buf = buf, .len = sizeof(buf) }; int logl = LOGL_INFO; - union sctp_notification *notif = (union sctp_notification *)data; + union sctp_notification *notif = (union sctp_notification *) msg->data; OSMO_STRBUF_PRINTF(sb, "%s NOTIFICATION %s flags=0x%x", log_pfx, osmo_sctp_sn_type_str(notif->sn_header.sn_type), notif->sn_header.sn_flags); @@ -283,8 +277,76 @@ int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx) LOGP(DLINP, logl, "%s\n", buf); return ret; } + + if (OSMO_UNLIKELY(ret > 0 && !sinfo)) + LOGP(DLINP, LOGL_ERROR, "%s sctp_recvmsg without SNDRCV cmsg?!?\n", log_pfx); + return ret; } + +/*! wrapper for regular synchronous sctp_recvmsg(3) */ +int stream_sctp_recvmsg_wrapper(int fd, struct msgb *msg, const char *log_pfx) +{ + struct sctp_sndrcvinfo sinfo; + int flags = 0; + int ret; + + ret = sctp_recvmsg(fd, msg->tail, msgb_tailroom(msg), NULL, NULL, &sinfo, &flags); + return stream_sctp_recvmsg_trailer(log_pfx, msg, ret, &sinfo, flags); +} + +/*! wrapper for osmo_io asynchronous recvmsg response */ +int stream_iofd_sctp_recvmsg_trailer(struct osmo_io_fd *iofd, struct msgb *msg, int ret, const struct msghdr *msgh) +{ + const struct sctp_sndrcvinfo *sinfo = NULL; + struct cmsghdr *cmsg = NULL; + + for (cmsg = CMSG_FIRSTHDR((struct msghdr *) msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR((struct msghdr *) msgh, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) { + sinfo = (const struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); + break; + } + } + + return stream_sctp_recvmsg_trailer(osmo_iofd_get_name(iofd), msg, ret, sinfo, msgh->msg_flags); +} + +/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg(). + * + * Appends the message to the internal transmit queue. + * If the function returns success (0), it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * In case of an error the msgb needs to be freed by the caller. + * + * \param[in] iofd file descriptor to write to + * \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream + * \param[in] sendmsg_flags Flags to pass to the send call + * \returns 0 in case of success; a negative value in case of error + */ +int stream_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags) +{ + struct msghdr outmsg = {}; + char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct sctp_sndrcvinfo *sinfo; + struct cmsghdr *cmsg; + + outmsg.msg_control = outcmsg; + outmsg.msg_controllen = sizeof(outcmsg); + + cmsg = CMSG_FIRSTHDR(&outmsg); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + + outmsg.msg_controllen = cmsg->cmsg_len; + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); + memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo)); + sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg)); + sinfo->sinfo_stream = msgb_sctp_stream(msg); + + return osmo_iofd_sendmsg_msgb(iofd, msg, sendmsg_flags, &outmsg); +} #endif diff --git a/src/stream_cli.c b/src/stream_cli.c index 1b9994c..cf639bc 100644 --- a/src/stream_cli.c +++ b/src/stream_cli.c @@ -470,13 +470,45 @@ static void stream_cli_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct ms } } -static struct osmo_io_ops osmo_stream_cli_ioops = { +static const struct osmo_io_ops osmo_stream_cli_ioops = { .read_cb = stream_cli_iofd_read_cb, .write_cb = stream_cli_iofd_write_cb, .segmentation_cb = NULL, }; +#ifdef HAVE_LIBSCTP +static void stream_cli_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) +{ + struct osmo_stream_cli *cli = osmo_iofd_get_data(iofd); + + res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); + + switch (cli->state) { + case STREAM_CLI_STATE_CONNECTING: + stream_cli_handle_connecting(cli, res); + break; + case STREAM_CLI_STATE_CONNECTED: + if (res == 0) + osmo_stream_cli_reconnect(cli); + /* Forward message to read callback, also if the connection failed. */ + if (cli->iofd_read_cb) + cli->iofd_read_cb(cli, msg); + break; + default: + osmo_panic("%s() called with unexpected state %d\n", __func__, cli->state); + } +} + +static const struct osmo_io_ops osmo_stream_cli_ioops_sctp = { + .recvmsg_cb = stream_cli_iofd_recvmsg_cb, + .sendmsg_cb = stream_cli_iofd_write_cb, + + .segmentation_cb = NULL, +}; +#endif + + /*! \brief Set a name on the cli object (used during logging) * \param[in] cli stream_cli whose name is to be set * \param[in] name the name to be set on cli @@ -592,14 +624,18 @@ osmo_stream_cli_set_proto(struct osmo_stream_cli *cli, uint16_t proto) } /* Configure client side segmentation for the iofd */ -static void configure_cli_segmentation_cb(struct osmo_io_fd *iofd, +static void configure_cli_segmentation_cb(struct osmo_stream_cli *cli, int (*segmentation_cb)(struct msgb *msg)) { /* Copy default settings */ - struct osmo_io_ops client_ops = osmo_stream_cli_ioops; + struct osmo_io_ops client_ops; + if (cli->proto == IPPROTO_SCTP) + client_ops = osmo_stream_cli_ioops_sctp; + else + client_ops = osmo_stream_cli_ioops; /* Set segmentation cb for this client */ client_ops.segmentation_cb = segmentation_cb; - osmo_iofd_set_ioops(iofd, &client_ops); + osmo_iofd_set_ioops(cli->iofd, &client_ops); } /*! \brief Set the segmentation callback for the client @@ -611,7 +647,7 @@ void osmo_stream_cli_set_segmentation_cb(struct osmo_stream_cli *cli, { cli->segmentation_cb = segmentation_cb; if (cli->iofd) /* Otherwise, this will be done in osmo_stream_cli_open() */ - configure_cli_segmentation_cb(cli->iofd, segmentation_cb); + configure_cli_segmentation_cb(cli, segmentation_cb); } /*! \brief Set the socket type for the stream server link @@ -904,11 +940,23 @@ int osmo_stream_cli_open(struct osmo_stream_cli *cli) goto error_close_socket; break; case OSMO_STREAM_MODE_OSMO_IO: - if (!cli->iofd) - cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, &osmo_stream_cli_ioops, cli); +#ifdef HAVE_LIBSCTP + if (cli->proto == IPPROTO_SCTP) { + cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, + &osmo_stream_cli_ioops_sctp, cli); + if (cli->iofd) + osmo_iofd_set_cmsg_size(cli->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); + } else { +#else + if (true) { +#endif + cli->iofd = osmo_iofd_setup(cli, fd, cli->name, OSMO_IO_FD_MODE_READ_WRITE, + &osmo_stream_cli_ioops, cli); + } if (!cli->iofd) goto error_close_socket; - configure_cli_segmentation_cb(cli->iofd, cli->segmentation_cb); + + configure_cli_segmentation_cb(cli, cli->segmentation_cb); if (osmo_iofd_register(cli->iofd, fd) < 0) goto error_close_socket; @@ -951,7 +999,10 @@ void osmo_stream_cli_send(struct osmo_stream_cli *cli, struct msgb *msg) osmo_fd_write_enable(&cli->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: - osmo_iofd_write_msgb(cli->iofd, msg); + if (cli->proto == IPPROTO_SCTP) + stream_iofd_sctp_send_msgb(cli->iofd, msg, MSG_NOSIGNAL); + else + osmo_iofd_write_msgb(cli->iofd, msg); break; default: OSMO_ASSERT(false); diff --git a/src/stream_srv.c b/src/stream_srv.c index dbf8aed..041a4ef 100644 --- a/src/stream_srv.c +++ b/src/stream_srv.c @@ -592,10 +592,46 @@ static void stream_srv_iofd_write_cb(struct osmo_io_fd *iofd, int res, struct ms osmo_stream_srv_destroy(conn); } -static struct osmo_io_ops srv_ioops = { +static const struct osmo_io_ops srv_ioops = { .read_cb = stream_srv_iofd_read_cb, .write_cb = stream_srv_iofd_write_cb, }; + +#ifdef HAVE_LIBSCTP +static void stream_srv_iofd_recvmsg_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct msghdr *msgh) +{ + struct osmo_stream_srv *conn = osmo_iofd_get_data(iofd); + LOGSSRV(conn, LOGL_DEBUG, "message received (res=%d)\n", res); + + res = stream_iofd_sctp_recvmsg_trailer(iofd, msg, res, msgh); + if (res == -EAGAIN) + return; + + if (OSMO_UNLIKELY(res <= 0)) { + /* This connection is dead, destroy it. */ + osmo_stream_srv_destroy(conn); + } else { + if (conn->flags & OSMO_STREAM_SRV_F_FLUSH_DESTROY) { + LOGSSRV(conn, LOGL_INFO, "Connection is being flushed and closed; ignoring received message\n"); + msgb_free(msg); + if (osmo_iofd_txqueue_len(iofd) == 0) + osmo_stream_srv_destroy(conn); + return; + } + + if (conn->iofd_read_cb) + conn->iofd_read_cb(conn, msg); + else + msgb_free(msg); + } +} + +static const struct osmo_io_ops srv_ioops_sctp = { + .recvmsg_cb = stream_srv_iofd_recvmsg_cb, + .sendmsg_cb = stream_srv_iofd_write_cb, +}; +#endif + static int osmo_stream_srv_read(struct osmo_stream_srv *conn) { int rc = 0; @@ -764,8 +800,15 @@ osmo_stream_srv_create2(void *ctx, struct osmo_stream_srv_link *link, int fd, vo osmo_sock_get_name_buf(conn->sockname, sizeof(conn->sockname), fd); - conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, - OSMO_IO_FD_MODE_READ_WRITE, &srv_ioops, conn); + if (link->proto == IPPROTO_SCTP) { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_RECVMSG_SENDMSG, + &srv_ioops_sctp, conn); + if (conn->iofd) + osmo_iofd_set_cmsg_size(conn->iofd, CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))); + } else { + conn->iofd = osmo_iofd_setup(conn, fd, conn->sockname, OSMO_IO_FD_MODE_READ_WRITE, + &srv_ioops, conn); + } if (!conn->iofd) { talloc_free(conn); return NULL; @@ -842,7 +885,11 @@ void osmo_stream_srv_set_segmentation_cb(struct osmo_stream_srv *conn, * osmo_stream_srv_create2() creates the iofd member, too */ OSMO_ASSERT(conn->mode == OSMO_STREAM_MODE_OSMO_IO); /* Copy default settings */ - struct osmo_io_ops conn_ops = srv_ioops; + struct osmo_io_ops conn_ops; + if (conn->srv->proto == IPPROTO_SCTP) + conn_ops = srv_ioops_sctp; + else + conn_ops = srv_ioops; /* Set segmentation cb for this connection */ conn_ops.segmentation_cb = segmentation_cb; osmo_iofd_set_ioops(conn->iofd, &conn_ops); @@ -951,7 +998,10 @@ void osmo_stream_srv_send(struct osmo_stream_srv *conn, struct msgb *msg) osmo_fd_write_enable(&conn->ofd); break; case OSMO_STREAM_MODE_OSMO_IO: - osmo_iofd_write_msgb(conn->iofd, msg); + if (conn->srv->proto == IPPROTO_SCTP) + stream_iofd_sctp_send_msgb(conn->iofd, msg, MSG_NOSIGNAL); + else + osmo_iofd_write_msgb(conn->iofd, msg); break; default: OSMO_ASSERT(false); |