Fix: memstream compat layer requires use of babeltrace_close_memstream
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index 0cd62f9812b8d03a201f2f85572697f8a2666c72..8509377c271014e2b7d0a27328e419ce66f08698 100644 (file)
@@ -26,7 +26,6 @@
 #include <netinet/in.h>
 #include <netdb.h>
 #include <stdio.h>
-#include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <errno.h>
@@ -38,6 +37,7 @@
 #include <babeltrace/ctf/ctf-index.h>
 
 #include <babeltrace/babeltrace.h>
+#include <babeltrace/endian.h>
 #include <babeltrace/ctf/events.h>
 #include <babeltrace/ctf/callbacks.h>
 #include <babeltrace/ctf/iterator.h>
@@ -51,6 +51,8 @@
 #include <formats/ctf/events-private.h>
 
 #include <babeltrace/compat/memstream.h>
+#include <babeltrace/compat/send.h>
+#include <babeltrace/compat/string.h>
 
 #include "lttng-live.h"
 #include "lttng-viewer-abi.h"
@@ -69,7 +71,7 @@
 
 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
                size_t index, int whence);
-static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int add_traces(struct lttng_live_ctx *ctx);
 static int del_traces(gpointer key, gpointer value, gpointer user_data);
 static int get_new_metadata(struct lttng_live_ctx *ctx,
                struct lttng_live_viewer_stream *viewer_stream,
@@ -102,7 +104,7 @@ ssize_t lttng_live_send(int fd, const void *buf, size_t len)
        ssize_t ret;
 
        do {
-               ret = send(fd, buf, len, MSG_NOSIGNAL);
+               ret = bt_send_nosigpipe(fd, buf, len);
        } while (ret < 0 && errno == EINTR);
        return ret;
 }
@@ -133,7 +135,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
-       bzero(&(server_addr.sin_zero), 8);
+       memset(&(server_addr.sin_zero), 0, 8);
 
        if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
                                sizeof(struct sockaddr)) == -1) {
@@ -262,9 +264,9 @@ void update_session_list(GPtrArray *session_list, char *hostname,
 
        for (i = 0; i < session_list->len; i++) {
                relay_session = g_ptr_array_index(session_list, i);
-               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+               if ((strncmp(relay_session->hostname, hostname, MAXNAMLEN) == 0) &&
                                strncmp(relay_session->name,
-                                       session_name, NAME_MAX) == 0) {
+                                       session_name, MAXNAMLEN) == 0) {
                        relay_session->streams += streams;
                        if (relay_session->clients < clients)
                                relay_session->clients = clients;
@@ -276,8 +278,8 @@ void update_session_list(GPtrArray *session_list, char *hostname,
                return;
 
        relay_session = g_new0(struct lttng_live_relay_session, 1);
-       relay_session->hostname = strndup(hostname, NAME_MAX);
-       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->hostname = bt_strndup(hostname, MAXNAMLEN);
+       relay_session->name = bt_strndup(session_name, MAXNAMLEN);
        relay_session->clients = clients;
        relay_session->streams = streams;
        relay_session->timer = timer;
@@ -351,8 +353,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                                        be32toh(lsession.live_timer));
                } else {
                        if ((strncmp(lsession.session_name, ctx->session_name,
-                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
-                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, MAXNAMLEN) == 0)) {
                                printf_verbose("Reading from session %" PRIu64 "\n",
                                                session_id);
                                g_array_append_val(ctx->session_ids,
@@ -381,13 +383,13 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        int ret = 0;
 
        trace = g_hash_table_lookup(stream->session->ctf_traces,
-                       (gpointer) ctf_trace_id);
+                       &ctf_trace_id);
        if (!trace) {
                trace = g_new0(struct lttng_live_ctf_trace, 1);
                trace->ctf_trace_id = ctf_trace_id;
                trace->streams = g_ptr_array_new();
                g_hash_table_insert(stream->session->ctf_traces,
-                               (gpointer) ctf_trace_id,
+                               &trace->ctf_trace_id,
                                trace);
        }
        if (stream->metadata_flag)
@@ -680,8 +682,8 @@ retry:
                goto error;
        }
        if (ret_len != sizeof(rp)) {
-               fprintf(stderr, "[error] get_data_packet: expected %" PRId64
-                               ", received %" PRId64 "\n", sizeof(rp),
+               fprintf(stderr, "[error] get_data_packet: expected %zu"
+                               ", received %zd\n", sizeof(rp),
                                ret_len);
                goto error;
        }
@@ -708,11 +710,14 @@ retry:
                if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
                        printf_verbose("get_data_packet: new streams needed\n");
                        ret = ask_new_streams(ctx);
-                       if (ret < 0)
+                       if (ret < 0) {
                                goto error;
-                       else if (ret > 0)
-                               g_hash_table_foreach(ctx->session->ctf_traces,
-                                               add_traces, ctx->bt_ctx);
+                       } else if (ret > 0) {
+                               ret = add_traces(ctx);
+                               if (ret < 0) {
+                                       goto error;
+                               }
+                       }
                }
                if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
@@ -909,6 +914,10 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
        }
 
        do {
+               if (lttng_live_should_quit()) {
+                       ret = -1;
+                       goto error;
+               }
                /*
                 * get_one_metadata_packet returns the number of bytes
                 * received, 0 when we have received everything, a
@@ -923,8 +932,10 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
                }
        } while (ret > 0 || !len_read);
 
-       if (fclose(metadata_stream->metadata_fp_write))
-               perror("fclose");
+       if (babeltrace_close_memstream(metadata_buf, &size,
+                       metadata_stream->metadata_fp_write)) {
+               perror("babeltrace_close_memstream");
+       }
        metadata_stream->metadata_fp_write = NULL;
 
 error:
@@ -1026,11 +1037,14 @@ retry:
                if (rp->flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
                        printf_verbose("get_next_index: need new streams\n");
                        ret = ask_new_streams(ctx);
-                       if (ret < 0)
+                       if (ret < 0) {
                                goto error;
-                       else if (ret > 0)
-                               g_hash_table_foreach(ctx->session->ctf_traces,
-                                               add_traces, ctx->bt_ctx);
+                       } else if (ret > 0) {
+                               ret = add_traces(ctx);
+                               if (ret < 0) {
+                                       goto error;
+                               }
+                       }
                }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
@@ -1065,7 +1079,8 @@ void read_packet_header(struct ctf_stream_pos *pos,
        int ret;
 
        /* update trace_packet_header and stream_packet_context */
-       if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
+       if (!(pos->prot & PROT_WRITE) &&
+               file_stream->parent.trace_packet_header) {
                /* Read packet header */
                ret = generic_rw(&pos->parent,
                                &file_stream->parent.trace_packet_header->p);
@@ -1076,7 +1091,8 @@ void read_packet_header(struct ctf_stream_pos *pos,
                        goto end;
                }
        }
-       if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
+       if (!(pos->prot & PROT_WRITE) &&
+               file_stream->parent.stream_packet_context) {
                /* Read packet context */
                ret = generic_rw(&pos->parent,
                                &file_stream->parent.stream_packet_context->p);
@@ -1104,7 +1120,7 @@ int handle_seek_position(size_t index, int whence,
                struct ctf_stream_pos *pos,
                struct ctf_file_stream *file_stream)
 {
-       int ret;
+       int ret = 0;
 
        switch (whence) {
        case SEEK_CUR:
@@ -1353,15 +1369,14 @@ int del_traces(gpointer key, gpointer value, gpointer user_data)
 }
 
 static
-void add_traces(gpointer key, gpointer value, gpointer user_data)
+int add_one_trace(struct lttng_live_ctx *ctx,
+               struct lttng_live_ctf_trace *trace)
 {
        int i, ret;
-       struct bt_context *bt_ctx = user_data;
-       struct lttng_live_ctf_trace *trace = value;
+       struct bt_context *bt_ctx = ctx->bt_ctx;
        struct lttng_live_viewer_stream *stream;
        struct bt_mmap_stream *new_mmap_stream;
        struct bt_mmap_stream_list mmap_list;
-       struct lttng_live_ctx *ctx = NULL;
        struct bt_trace_descriptor *td;
        struct bt_trace_handle *handle;
 
@@ -1374,14 +1389,15 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
         * times the same traces.
         * If a trace is already in the context, we just skip this function.
         */
-       if (trace->in_use)
-               return;
+       if (trace->in_use) {
+               ret = 0;
+               goto end;
+       }
 
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
        for (i = 0; i < trace->streams->len; i++) {
                stream = g_ptr_array_index(trace->streams, i);
-               ctx = stream->session->ctx;
 
                if (!stream->metadata_flag) {
                        new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
@@ -1417,6 +1433,7 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
 
        if (!trace->metadata_fp) {
                fprintf(stderr, "[error] No metadata stream opened\n");
+               ret = -1;
                goto end_free;
        }
 
@@ -1424,6 +1441,7 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
                        ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
        if (ret < 0) {
                fprintf(stderr, "[error] Error adding trace\n");
+               ret = -1;
                goto end_free;
        }
        trace->metadata_stream->metadata_len = 0;
@@ -1445,7 +1463,31 @@ void add_traces(gpointer key, gpointer value, gpointer user_data)
 end_free:
        bt_context_put(bt_ctx);
 end:
-       return;
+       return ret;
+}
+
+static
+int add_traces(struct lttng_live_ctx *ctx)
+{
+       int ret;
+       struct lttng_live_ctf_trace *trace;
+       GHashTableIter it;
+       gpointer key;
+       gpointer value;
+
+       g_hash_table_iter_init(&it, ctx->session->ctf_traces);
+       while (g_hash_table_iter_next(&it, &key, &value)) {
+               trace = (struct lttng_live_ctf_trace *) value;
+               ret = add_one_trace(ctx, trace);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
+       ret = 0;
+
+end:
+       return ret;
 }
 
 /*
@@ -1617,7 +1659,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
 
        for (i = 0; i < ctx->session_ids->len; i++) {
                id = g_array_index(ctx->session_ids, uint64_t, i);
-               printf_verbose("Attaching to session %lu\n", id);
+               printf_verbose("Attaching to session %" PRIu64 "\n", id);
                ret = lttng_live_attach_session(ctx, id);
                printf_verbose("Attaching session returns %d\n", ret);
                if (ret < 0) {
@@ -1654,8 +1696,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                        }
                }
 
-               g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
-                               ctx->bt_ctx);
+               ret = add_traces(ctx);
+               if (ret < 0) {
+                       goto end_free;
+               }
 
                begin_pos.type = BT_SEEK_BEGIN;
                iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
This page took 0.027569 seconds and 4 git commands to generate.