aboutsummaryrefslogtreecommitdiffstats
path: root/src/osmo_server_network.c
diff options
context:
space:
mode:
authorHolger Hans Peter Freyther <holger@moiji-mobile.com>2016-08-05 15:47:08 +0200
committerHolger Hans Peter Freyther <holger@moiji-mobile.com>2016-08-05 16:10:05 +0200
commite024869a728b481cd726ce3489a54643ffb57d2b (patch)
tree19c4138c32dcda48a831115f3033d7a9dc3f0156 /src/osmo_server_network.c
parentad29ce6f0699ca2b26fbd4cc5cdf7779d70bb8e2 (diff)
server: Add zmq based event and data interface to the server
To allow easily extracting or streaming the data to an external analysis system, zeromq can be configured (and reconfigured). The system works as fire and forget and no loss detection is present. A simple go based client application is provided to subscribe to the publisher. Change-Id: I4f3e6d675023a81b7d2ee19bf1f44a2be0ca003c
Diffstat (limited to 'src/osmo_server_network.c')
-rw-r--r--src/osmo_server_network.c116
1 files changed, 100 insertions, 16 deletions
diff --git a/src/osmo_server_network.c b/src/osmo_server_network.c
index d530ef7..6812f35 100644
--- a/src/osmo_server_network.c
+++ b/src/osmo_server_network.c
@@ -31,11 +31,99 @@
#include <sys/socket.h>
#include <sys/types.h>
+#include <zmq.h>
+
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
+static void pcap_zmq_send(void *publ, const void *data, size_t len, int flags)
+{
+ int rc;
+ zmq_msg_t msg;
+
+ rc = zmq_msg_init_size(&msg, len);
+ if (rc != 0) {
+ /* sigh.. we said SNDMORE but can't... */
+ LOGP(DSERVER, LOGL_ERROR, "Failed to init rc=%d errno=%d/%s\n",
+ rc, errno, strerror(errno));
+ return;
+ }
+ memcpy(zmq_msg_data(&msg), data, len);
+ rc = zmq_msg_send(&msg, publ, flags);
+ if (rc == -1) {
+ /* is the zmq_msg now owned? leak??? */
+ LOGP(DSERVER, LOGL_ERROR, "Failed to send data rc=%d errno=%d/%s\n",
+ rc, errno, strerror(errno));
+ return;
+ }
+}
+
+static void client_event(struct osmo_pcap_conn *conn,
+ const char *event, const char *data)
+{
+ char *event_name;
+
+ if (!conn->server->zmq_publ)
+ return;
+
+ /*
+ * This multi-part support is insane... so if we lose the first
+ * or the last part of the multipart message stuff is going out
+ * of sync. *great* As we can't do anything about it right now
+ * just close the eyese and send it.
+ */
+ event_name = talloc_asprintf(conn, "event.v1.%s.%s",
+ event, conn->name);
+ pcap_zmq_send(conn->server->zmq_publ,
+ event_name, strlen(event_name),
+ data ? ZMQ_SNDMORE : 0);
+ talloc_free(event_name);
+ if (data)
+ pcap_zmq_send(conn->server->zmq_publ, data, strlen(data), 0);
+}
+
+static void client_data(struct osmo_pcap_conn *conn,
+ struct osmo_pcap_data *data)
+{
+ char *event_name;
+
+ if (!conn->server->zmq_publ)
+ return;
+
+ /*
+ * This multi-part support is insane... so if we lose the first
+ * or the last part of the multipart message stuff is going out
+ * of sync. *great* As we can't do anything about it right now
+ * just close the eyese and send it.
+ */
+ event_name = talloc_asprintf(conn, "data.v1.%s", conn->name);
+ pcap_zmq_send(conn->server->zmq_publ, event_name, strlen(event_name), ZMQ_SNDMORE);
+ talloc_free(event_name);
+
+ pcap_zmq_send(conn->server->zmq_publ,
+ &conn->file_hdr, sizeof(conn->file_hdr),
+ ZMQ_SNDMORE);
+ pcap_zmq_send(conn->server->zmq_publ,
+ &data->data[0], data->len,
+ 0);
+}
+
+void osmo_pcap_server_close_trace(struct osmo_pcap_conn *conn)
+{
+ if (conn->local_fd >= 0) {
+ close(conn->local_fd);
+ conn->local_fd = -1;
+ }
+
+ if (conn->curr_filename) {
+ client_event(conn, "closingtracefile", conn->curr_filename);
+ talloc_free(conn->curr_filename);
+ conn->curr_filename = NULL;
+ }
+}
+
static void close_connection(struct osmo_pcap_conn *conn)
{
if (conn->rem_fd.fd >= 0) {
@@ -44,44 +132,39 @@ static void close_connection(struct osmo_pcap_conn *conn)
osmo_fd_unregister(&conn->rem_fd);
}
- if (conn->local_fd >= 0) {
- close(conn->local_fd);
- conn->local_fd = -1;
- }
+ osmo_pcap_server_close_trace(conn);
+ client_event(conn, "disconnect", NULL);
}
static void restart_pcap(struct osmo_pcap_conn *conn)
{
time_t now = time(NULL);
struct tm *tm = localtime(&now);
- char *filename;
int rc;
- if (conn->local_fd >= 0) {
- close(conn->local_fd);
- conn->local_fd = -1;
- }
+ osmo_pcap_server_close_trace(conn);
/* omit any storing/creation of the file */
if (conn->no_store) {
conn->last_write = *tm;
+ talloc_free(conn->curr_filename);
+ conn->curr_filename = NULL;
return;
}
- filename = talloc_asprintf(conn, "%s/trace-%s-%d%.2d%.2d_%.2d%.2d%.2d.pcap",
+ conn->curr_filename = talloc_asprintf(conn, "%s/trace-%s-%d%.2d%.2d_%.2d%.2d%.2d.pcap",
conn->server->base_path, conn->name,
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
tm->tm_hour, tm->tm_min, tm->tm_sec);
- if (!filename) {
+ if (!conn->curr_filename) {
LOGP(DSERVER, LOGL_ERROR, "Failed to assemble filename for %s.\n", conn->name);
return;
}
- conn->local_fd = creat(filename, 0440);
+ conn->local_fd = creat(conn->curr_filename, 0440);
if (conn->local_fd < 0) {
- LOGP(DSERVER, LOGL_ERROR, "Failed to file: '%s'\n", filename);
- talloc_free(filename);
+ LOGP(DSERVER, LOGL_ERROR, "Failed to file: '%s'\n", conn->curr_filename);
return;
}
@@ -90,12 +173,10 @@ static void restart_pcap(struct osmo_pcap_conn *conn)
LOGP(DSERVER, LOGL_ERROR, "Failed to write the header: %d\n", errno);
close(conn->local_fd);
conn->local_fd = -1;
- talloc_free(filename);
return;
}
conn->last_write = *tm;
- talloc_free(filename);
}
static void link_data(struct osmo_pcap_conn *conn, struct osmo_pcap_data *data)
@@ -127,6 +208,8 @@ static void write_data(struct osmo_pcap_conn *conn, struct osmo_pcap_data *data)
struct tm *tm = localtime(&now);
int rc;
+ client_data(conn, data);
+
if (conn->no_store) {
conn->last_write = *tm;
return;
@@ -317,6 +400,7 @@ static int accept_cb(struct osmo_fd *fd, unsigned int when)
if (conn->remote_addr.s_addr == addr.sin_addr.s_addr) {
LOGP(DSERVER, LOGL_NOTICE,
"New connection from %s\n", conn->name);
+ client_event(conn, "connect", NULL);
new_connection(server, conn, new_fd);
return 0;
}