From 8bf16493f68fd53c6e14f1612caf7425c73b2f4d Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Mon, 3 Oct 2011 22:09:45 +0200 Subject: initial commit --- src/Makefile.am | 11 ++ src/stream.c | 484 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 495 insertions(+) create mode 100644 src/Makefile.am create mode 100644 src/stream.c (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..30e1e72 --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,11 @@ +# This is _NOT_ the library release version, it's an API version. +# Please read Chapter 6 "Library interface versions" of the libtool documentation before making any modification +LIBVERSION=0:0:0 + +INCLUDES = $(all_includes) -I$(top_srcdir)/include -I$(top_builddir) +AM_CFLAGS= -fPIC -Wall $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) $(LIBOSMOVTY_CFLAGS) $(COVERAGE_CFLAGS) +AM_LDFLAGS = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) $(LIBOSMOVTY_LIBS) $(COVERAGE_LDFLAGS) + +lib_LTLIBRARIES = libosmonetif.la + +libosmonetif_la_SOURCES = stream.c diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 0000000..2603ecb --- /dev/null +++ b/src/stream.c @@ -0,0 +1,484 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +/* + * Client side. + */ + +enum stream_client_conn_state { + STREAM_CLIENT_LINK_STATE_NONE = 0, + STREAM_CLIENT_LINK_STATE_CONNECTING = 1, + STREAM_CLIENT_LINK_STATE_CONNECTED = 2, + STREAM_CLIENT_LINK_STATE_MAX +}; + +struct stream_client_conn { + struct osmo_fd ofd; + struct llist_head tx_queue; + struct osmo_timer_list timer; + enum stream_client_conn_state state; + const char *addr; + uint16_t port; + int (*connect_cb)(struct stream_client_conn *link); + int (*read_cb)(struct stream_client_conn *link, struct msgb *msg); + int (*write_cb)(struct stream_client_conn *link); + void *data; +}; + +static int stream_msg_recv(int fd, struct msgb *msg) +{ + int ret; + + ret = recv(fd, msg->data, msg->data_len, 0); + if (ret <= 0) + return ret; + + msgb_put(msg, ret); + return ret; +} + +void stream_client_conn_close(struct stream_client_conn *link); + +static void stream_client_retry(struct stream_client_conn *link) +{ + LOGP(DLINP, LOGL_DEBUG, "connection closed\n"); + stream_client_conn_close(link); + LOGP(DLINP, LOGL_DEBUG, "retrying in 5 seconds...\n"); + osmo_timer_schedule(&link->timer, 5, 0); + link->state = STREAM_CLIENT_LINK_STATE_CONNECTING; +} + +void stream_client_conn_close(struct stream_client_conn *link) +{ + osmo_fd_unregister(&link->ofd); + close(link->ofd.fd); +} + +static void stream_client_read(struct stream_client_conn *link) +{ + struct msgb *msg; + int ret; + + LOGP(DLINP, LOGL_DEBUG, "message received\n"); + + msg = msgb_alloc(1200, "LAPD/client"); + if (!msg) { + LOGP(DLINP, LOGL_ERROR, "cannot allocate room for message\n"); + return; + } + ret = stream_msg_recv(link->ofd.fd, msg); + if (ret < 0) { + if (errno == EPIPE || errno == ECONNRESET) { + LOGP(DLINP, LOGL_ERROR, "lost connection with server\n"); + } + stream_client_retry(link); + return; + } else if (ret == 0) { + LOGP(DLINP, LOGL_ERROR, "connection closed with server\n"); + stream_client_retry(link); + return; + } + msgb_put(msg, ret); + if (link->read_cb) + link->read_cb(link, msg); +} + +static int stream_client_write(struct stream_client_conn *link) +{ + struct msgb *msg; + struct llist_head *lh; + int ret; + + LOGP(DLINP, LOGL_DEBUG, "sending data\n"); + + if (llist_empty(&link->tx_queue)) { + link->ofd.when &= ~BSC_FD_WRITE; + return 0; + } + lh = link->tx_queue.next; + llist_del(lh); + msg = llist_entry(lh, struct msgb, list); + + if (link->state == STREAM_CLIENT_LINK_STATE_CONNECTING) { + LOGP(DLINP, LOGL_ERROR, "not connected, dropping data!\n"); + return 0; + } + + ret = send(link->ofd.fd, msg->data, msg->len, 0); + if (ret < 0) { + if (errno == EPIPE || errno == ENOTCONN) { + stream_client_retry(link); + } + LOGP(DLINP, LOGL_ERROR, "error to send\n"); + } + msgb_free(msg); + return 0; +} + +static int stream_client_fd_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct stream_client_conn *link = ofd->data; + int error, ret; + socklen_t len = sizeof(error); + + switch(link->state) { + case STREAM_CLIENT_LINK_STATE_CONNECTING: + ret = getsockopt(ofd->fd, SOL_SOCKET, SO_ERROR, &error, &len); + if (ret >= 0 && error > 0) { + stream_client_retry(link); + return 0; + } + ofd->when &= ~BSC_FD_WRITE; + LOGP(DLINP, LOGL_DEBUG, "connection done.\n"); + link->state = STREAM_CLIENT_LINK_STATE_CONNECTED; + if (link->connect_cb) + link->connect_cb(link); + break; + case STREAM_CLIENT_LINK_STATE_CONNECTED: + if (what & BSC_FD_READ) { + LOGP(DLINP, LOGL_DEBUG, "connected read\n"); + stream_client_read(link); + } + if (what & BSC_FD_WRITE) { + LOGP(DLINP, LOGL_DEBUG, "connected write\n"); + stream_client_write(link); + } + break; + default: + break; + } + return 0; +} + +static void link_timer_cb(void *data); + +struct stream_client_conn *stream_client_conn_create(void *ctx) +{ + struct stream_client_conn *link; + + link = talloc_zero(ctx, struct stream_client_conn); + if (!link) + return NULL; + + link->ofd.when |= BSC_FD_READ | BSC_FD_WRITE; + link->ofd.priv_nr = 0; /* XXX */ + link->ofd.cb = stream_client_fd_cb; + link->ofd.data = link; + link->state = STREAM_CLIENT_LINK_STATE_CONNECTING; + link->timer.cb = link_timer_cb; + link->timer.data = link; + INIT_LLIST_HEAD(&link->tx_queue); + + return link; +} + +void +stream_client_conn_set_addr(struct stream_client_conn *link, const char *addr) +{ + link->addr = talloc_strdup(link, addr); +} + +void +stream_client_conn_set_port(struct stream_client_conn *link, uint16_t port) +{ + link->port = port; +} + +void +stream_client_conn_set_data(struct stream_client_conn *link, void *data) +{ + link->data = data; +} + +void +stream_client_conn_set_connect_cb(struct stream_client_conn *link, + int (*connect_cb)(struct stream_client_conn *link)) +{ + link->connect_cb = connect_cb; +} + +void +stream_client_conn_set_read_cb(struct stream_client_conn *link, + int (*read_cb)(struct stream_client_conn *link, struct msgb *msgb)) +{ + link->read_cb = read_cb; +} + +void stream_client_conn_destroy(struct stream_client_conn *link) +{ + talloc_free(link); +} + +int stream_client_conn_open(struct stream_client_conn *link) +{ + int ret; + + ret = osmo_sock_init(AF_INET, SOCK_STREAM, IPPROTO_TCP, + link->addr, link->port, + OSMO_SOCK_F_CONNECT|OSMO_SOCK_F_NONBLOCK); + if (ret < 0) { + if (errno != EINPROGRESS) + return ret; + } + link->ofd.fd = ret; + if (osmo_fd_register(&link->ofd) < 0) { + close(ret); + return -EIO; + } + return 0; +} + +static void link_timer_cb(void *data) +{ + struct stream_client_conn *link = data; + + LOGP(DLINP, LOGL_DEBUG, "reconnecting.\n"); + + switch(link->state) { + case STREAM_CLIENT_LINK_STATE_CONNECTING: + stream_client_conn_open(link); + break; + default: + break; + } +} + +void stream_client_conn_send(struct stream_client_conn *link, struct msgb *msg) +{ + msgb_enqueue(&link->tx_queue, msg); + link->ofd.when |= BSC_FD_WRITE; +} + +/* + * Server side. + */ + +struct stream_server_link { + struct osmo_fd ofd; + const char *addr; + uint16_t port; + int (*accept_cb)(struct stream_server_link *link, int fd); + void *data; +}; + +static int stream_server_fd_cb(struct osmo_fd *ofd, unsigned int what) +{ + int ret; + struct sockaddr_in sa; + socklen_t sa_len = sizeof(sa); + struct stream_server_link *link = ofd->data; + + ret = accept(ofd->fd, (struct sockaddr *)&sa, &sa_len); + if (ret < 0) { + LOGP(DLINP, LOGL_ERROR, "failed to accept from origin " + "peer, reason=`%s'\n", strerror(errno)); + return ret; + } + LOGP(DLINP, LOGL_DEBUG, "accept()ed new link from %s to port %u\n", + inet_ntoa(sa.sin_addr), link->port); + + if (link->accept_cb) + link->accept_cb(link, ret); + + return 0; +} + +struct stream_server_link *stream_server_link_create(void *ctx) +{ + struct stream_server_link *link; + + link = talloc_zero(ctx, struct stream_server_link); + if (!link) + return NULL; + + link->ofd.when |= BSC_FD_READ | BSC_FD_WRITE; + link->ofd.cb = stream_server_fd_cb; + link->ofd.data = link; + + return link; +} + +void stream_server_link_set_addr(struct stream_server_link *link, const char *addr) +{ + link->addr = talloc_strdup(link, addr); +} + +void stream_server_link_set_port(struct stream_server_link *link, uint16_t port) +{ + link->port = port; +} + +void stream_server_link_set_accept_cb(struct stream_server_link *link, + int (*accept_cb)(struct stream_server_link *link, int fd)) + +{ + link->accept_cb = accept_cb; +} + +void stream_server_link_destroy(struct stream_server_link *link) +{ + talloc_free(link); +} + +int stream_server_link_open(struct stream_server_link *link) +{ + int ret; + + ret = osmo_sock_init(AF_INET, SOCK_STREAM, IPPROTO_TCP, + link->addr, link->port, OSMO_SOCK_F_BIND); + if (ret < 0) + return ret; + + link->ofd.fd = ret; + if (osmo_fd_register(&link->ofd) < 0) { + close(ret); + return -EIO; + } + return 0; +} + +void stream_server_link_close(struct stream_server_link *link) +{ + osmo_fd_unregister(&link->ofd); + close(link->ofd.fd); +} + +struct stream_server_conn { + struct stream_server_link *server; + struct osmo_fd ofd; + struct llist_head tx_queue; + int (*closed_cb)(struct stream_server_conn *peer); + int (*cb)(struct stream_server_conn *peer, struct msgb *msg); + void *data; +}; + +static void stream_server_conn_read(struct stream_server_conn *conn) +{ + struct msgb *msg; + int ret; + + LOGP(DLINP, LOGL_DEBUG, "message received\n"); + + msg = msgb_alloc(1200, "LAPD/client"); + if (!msg) { + LOGP(DLINP, LOGL_ERROR, "cannot allocate room for message\n"); + return; + } + ret = stream_msg_recv(conn->ofd.fd, msg); + if (ret < 0) { + if (errno == EPIPE || errno == ECONNRESET) { + LOGP(DLINP, LOGL_ERROR, "lost connection with server\n"); + } + stream_server_conn_destroy(conn); + return; + } else if (ret == 0) { + LOGP(DLINP, LOGL_ERROR, "connection closed with server\n"); + stream_server_conn_destroy(conn); + return; + } + msgb_put(msg, ret); + LOGP(DLINP, LOGL_NOTICE, "received %d bytes from client\n", ret); + if (conn->cb) + conn->cb(conn, msg); + + return; +} + +static void stream_server_conn_write(struct stream_server_conn *conn) +{ + struct msgb *msg; + struct llist_head *lh; + int ret; + + LOGP(DLINP, LOGL_DEBUG, "sending data\n"); + + if (llist_empty(&conn->tx_queue)) { + conn->ofd.when &= ~BSC_FD_WRITE; + return; + } + lh = conn->tx_queue.next; + llist_del(lh); + msg = llist_entry(lh, struct msgb, list); + + ret = send(conn->ofd.fd, msg->data, msg->len, 0); + if (ret < 0) { + LOGP(DLINP, LOGL_ERROR, "error to send\n"); + } + msgb_free(msg); +} + +static int stream_server_conn_cb(struct osmo_fd *ofd, unsigned int what) +{ + struct stream_server_conn *conn = ofd->data; + + LOGP(DLINP, LOGL_DEBUG, "connected read/write\n"); + if (what & BSC_FD_READ) + stream_server_conn_read(conn); + if (what & BSC_FD_WRITE) + stream_server_conn_write(conn); + + return 0; +} + +struct stream_server_conn * +stream_server_conn_create(void *ctx, struct stream_server_link *link, int fd, + int (*cb)(struct stream_server_conn *conn, struct msgb *msg), + int (*closed_cb)(struct stream_server_conn *conn), void *data) +{ + struct stream_server_conn *conn; + + conn = talloc_zero(ctx, struct stream_server_conn); + if (conn == NULL) { + LOGP(DLINP, LOGL_ERROR, "cannot allocate new peer in server, " + "reason=`%s'\n", strerror(errno)); + return NULL; + } + conn->server = link; + conn->ofd.fd = fd; + conn->ofd.data = conn; + conn->ofd.cb = stream_server_conn_cb; + conn->ofd.when = BSC_FD_READ; + conn->cb = cb; + conn->closed_cb = closed_cb; + conn->data = data; + INIT_LLIST_HEAD(&conn->tx_queue); + + if (osmo_fd_register(&conn->ofd) < 0) { + LOGP(DLINP, LOGL_ERROR, "could not register FD\n"); + talloc_free(conn); + return NULL; + } + return conn; +} + +void stream_server_conn_destroy(struct stream_server_conn *conn) +{ + close(conn->ofd.fd); + osmo_fd_unregister(&conn->ofd); + if (conn->closed_cb) + conn->closed_cb(conn); + talloc_free(conn); +} + +void stream_server_conn_send(struct stream_server_conn *conn, struct msgb *msg) +{ + msgb_enqueue(&conn->tx_queue, msg); + conn->ofd.when |= BSC_FD_WRITE; +} -- cgit v1.2.3