ctf.fs: bt_ctf_notif_iter_create(): assert() that all medops exist
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index 5ae743b1055b0d557a31cfeabc46a7edfee6e740..6c5d4fd296f55fc9f23f4bba506432c3889f367c 100644 (file)
 #include <netinet/in.h>
 #include <netdb.h>
 #include <stdio.h>
-#include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <errno.h>
 #include <inttypes.h>
 #include <fcntl.h>
-#include <sys/mman.h>
 #include <poll.h>
 
 #include <babeltrace/ctf/ctf-index.h>
 
 #include <babeltrace/endian.h>
 #include <babeltrace/compat/memstream.h>
+#include <babeltrace/compat/send.h>
+#include <babeltrace/compat/string.h>
+#include <babeltrace/compat/mman.h>
+#include <babeltrace/babeltrace-internal.h>
 
 #include "lttng-live.h"
 #include "lttng-viewer-abi.h"
 
 #define ACTIVE_POLL_DELAY      100     /* ms */
 
-/*
- * Memory allocation zeroed
- */
-#define zmalloc(x) calloc(1, x)
-
-#ifndef max_t
-#define max_t(type, a, b)      \
-       ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
-#endif
-
 static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
                size_t index, int whence);
 static int add_traces(struct lttng_live_ctx *ctx);
@@ -103,7 +95,7 @@ ssize_t lttng_live_send(int fd, const void *buf, size_t len)
        ssize_t ret;
 
        do {
-               ret = send(fd, buf, len, MSG_NOSIGNAL);
+               ret = bt_send_nosigpipe(fd, buf, len);
        } while (ret < 0 && errno == EINTR);
        return ret;
 }
@@ -134,7 +126,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
-       bzero(&(server_addr.sin_zero), 8);
+       memset(&(server_addr.sin_zero), 0, 8);
 
        if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
                                sizeof(struct sockaddr)) == -1) {
@@ -165,8 +157,8 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
-       cmd.data_size = sizeof(connect);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(connect));
+       cmd.cmd_version = htobe32(0);
 
        connect.viewer_session_id = -1ULL;      /* will be set on recv */
        connect.major = htobe32(LTTNG_LIVE_MAJOR);
@@ -245,7 +237,7 @@ void print_session_list(GPtrArray *session_list, const char *path)
 
        for (i = 0; i < session_list->len; i++) {
                relay_session = g_ptr_array_index(session_list, i);
-               fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+               fprintf(LTTNG_LIVE_OUTPUT_FP, "%s/host/%s/%s (timer = %u, "
                                "%u stream(s), %u client(s) connected)\n",
                                path, relay_session->hostname,
                                relay_session->name, relay_session->timer,
@@ -263,9 +255,9 @@ void update_session_list(GPtrArray *session_list, char *hostname,
 
        for (i = 0; i < session_list->len; i++) {
                relay_session = g_ptr_array_index(session_list, i);
-               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+               if ((strncmp(relay_session->hostname, hostname, MAXNAMLEN) == 0) &&
                                strncmp(relay_session->name,
-                                       session_name, NAME_MAX) == 0) {
+                                       session_name, MAXNAMLEN) == 0) {
                        relay_session->streams += streams;
                        if (relay_session->clients < clients)
                                relay_session->clients = clients;
@@ -277,8 +269,8 @@ void update_session_list(GPtrArray *session_list, char *hostname,
                return;
 
        relay_session = g_new0(struct lttng_live_relay_session, 1);
-       relay_session->hostname = strndup(hostname, NAME_MAX);
-       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->hostname = bt_strndup(hostname, MAXNAMLEN);
+       relay_session->name = bt_strndup(session_name, MAXNAMLEN);
        relay_session->clients = clients;
        relay_session->streams = streams;
        relay_session->timer = timer;
@@ -306,8 +298,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
-       cmd.data_size = 0;
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) 0);
+       cmd.cmd_version = htobe32(0);
 
        ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
        if (ret_len < 0) {
@@ -352,8 +344,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                                        be32toh(lsession.live_timer));
                } else {
                        if ((strncmp(lsession.session_name, ctx->session_name,
-                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
-                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, MAXNAMLEN) == 0)) {
                                printf_verbose("Reading from session %" PRIu64 "\n",
                                                session_id);
                                g_array_append_val(ctx->session_ids,
@@ -382,13 +374,13 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
        int ret = 0;
 
        trace = g_hash_table_lookup(stream->session->ctf_traces,
-                       (gpointer) ctf_trace_id);
+                       &ctf_trace_id);
        if (!trace) {
                trace = g_new0(struct lttng_live_ctf_trace, 1);
                trace->ctf_trace_id = ctf_trace_id;
                trace->streams = g_ptr_array_new();
                g_hash_table_insert(stream->session->ctf_traces,
-                               (gpointer) ctf_trace_id,
+                               &trace->ctf_trace_id,
                                trace);
        }
        if (stream->metadata_flag)
@@ -407,7 +399,7 @@ int open_metadata_fp_write(struct lttng_live_viewer_stream *stream,
        int ret = 0;
 
        stream->metadata_fp_write =
-               babeltrace_open_memstream(metadata_buf, size);
+               bt_open_memstream(metadata_buf, size);
        if (!stream->metadata_fp_write) {
                perror("Metadata open_memstream");
                ret = -1;
@@ -431,8 +423,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
-       cmd.data_size = sizeof(rq);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(rq));
+       cmd.cmd_version = htobe32(0);
 
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(id);
@@ -609,7 +601,7 @@ int append_metadata(struct lttng_live_ctx *ctx,
 
        metadata = viewer_stream->ctf_trace->metadata_stream;
        metadata->ctf_trace->metadata_fp =
-               babeltrace_fmemopen(metadata_buf,
+               bt_fmemopen(metadata_buf,
                                metadata->metadata_len, "rb");
        if (!metadata->ctf_trace->metadata_fp) {
                perror("Metadata fmemopen");
@@ -648,9 +640,10 @@ retry:
                ret = -1;
                goto end;
        }
+
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
-       cmd.data_size = sizeof(rq);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(rq));
+       cmd.cmd_version = htobe32(0);
 
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->id);
@@ -801,8 +794,8 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx,
 
        rq.stream_id = htobe64(metadata_stream->id);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
-       cmd.data_size = sizeof(rq);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(rq));
+       cmd.cmd_version = htobe32(0);
 
        ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
        if (ret_len < 0) {
@@ -931,8 +924,10 @@ int get_new_metadata(struct lttng_live_ctx *ctx,
                }
        } while (ret > 0 || !len_read);
 
-       if (fclose(metadata_stream->metadata_fp_write))
-               perror("fclose");
+       if (bt_close_memstream(metadata_buf, &size,
+                       metadata_stream->metadata_fp_write)) {
+               perror("bt_close_memstream");
+       }
        metadata_stream->metadata_fp_write = NULL;
 
 error:
@@ -974,8 +969,8 @@ int get_next_index(struct lttng_live_ctx *ctx,
        struct lttng_viewer_index *rp = &viewer_stream->current_index;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
-       cmd.data_size = sizeof(rq);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(rq));
+       cmd.cmd_version = htobe32(0);
 
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(viewer_stream->id);
@@ -1117,7 +1112,7 @@ int handle_seek_position(size_t index, int whence,
                struct ctf_stream_pos *pos,
                struct ctf_file_stream *file_stream)
 {
-       int ret;
+       int ret = 0;
 
        switch (whence) {
        case SEEK_CUR:
@@ -1278,6 +1273,17 @@ retry:
                                cur_index->ts_real.timestamp_begin;
        }
 
+       /*
+        * Flush the output between attempts to grab a packet, thus
+        * ensuring we flush at least at the periodical timer period.
+        * This ensures the output remains reactive for interactive users and
+        * that the output is flushed when redirected to a file by the shell.
+        */
+       if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) {
+               perror("fflush");
+               goto end;
+       }
+
        if (pos->packet_size == 0 || pos->offset == EOF) {
                goto end;
        }
@@ -1317,8 +1323,8 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
-       cmd.data_size = 0;
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) 0);
+       cmd.cmd_version = htobe32(0);
 
        ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
        if (ret_len < 0) {
@@ -1417,7 +1423,7 @@ int add_one_trace(struct lttng_live_ctx *ctx,
                                goto end_free;
                        }
 
-                       trace->metadata_fp = babeltrace_fmemopen(metadata_buf,
+                       trace->metadata_fp = bt_fmemopen(metadata_buf,
                                        stream->metadata_len, "rb");
                        if (!trace->metadata_fp) {
                                perror("Metadata fmemopen");
@@ -1507,8 +1513,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
-       cmd.data_size = sizeof(rq);
-       cmd.cmd_version = 0;
+       cmd.data_size = htobe64((uint64_t) sizeof(rq));
+       cmd.cmd_version = htobe32(0);
 
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(id);
@@ -1631,7 +1637,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx)
                goto end;
        }
 
-       fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
+       fmt_write = bt_lookup_format(g_quark_from_string("text"));
        if (!fmt_write) {
                fprintf(stderr, "[error] ctf-text error\n");
                goto end;
This page took 0.029039 seconds and 4 git commands to generate.