X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=ec0a0123ecda35c7f3a19fe4c0f89e73285b048c;hp=31cafd78b30480f9ed05e3568e7360817341df06;hb=21fe3eb3b83998d6fad94f7ec346d57593afe8c1;hpb=d1694e4102473f753a5557ecceeba693259c7b75 diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index 31cafd78..ec0a0123 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -26,18 +26,17 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include #include +#include #include #include #include @@ -51,6 +50,9 @@ #include #include +#include +#include +#include #include "lttng-live.h" #include "lttng-viewer-abi.h" @@ -69,7 +71,7 @@ static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence); -static void add_traces(gpointer key, gpointer value, gpointer user_data); +static int add_traces(struct lttng_live_ctx *ctx); static int del_traces(gpointer key, gpointer value, gpointer user_data); static int get_new_metadata(struct lttng_live_ctx *ctx, struct lttng_live_viewer_stream *viewer_stream, @@ -102,7 +104,7 @@ ssize_t lttng_live_send(int fd, const void *buf, size_t len) ssize_t ret; do { - ret = send(fd, buf, len, MSG_NOSIGNAL); + ret = bt_send_nosigpipe(fd, buf, len); } while (ret < 0 && errno == EINTR); return ret; } @@ -133,7 +135,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx) server_addr.sin_family = AF_INET; server_addr.sin_port = htons(ctx->port); server_addr.sin_addr = *((struct in_addr *) host->h_addr); - bzero(&(server_addr.sin_zero), 8); + memset(&(server_addr.sin_zero), 0, 8); if (connect(ctx->control_sock, (struct sockaddr *) &server_addr, sizeof(struct sockaddr)) == -1) { @@ -155,6 +157,8 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -164,27 +168,28 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) } cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); - cmd.data_size = sizeof(connect); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(connect)); + cmd.cmd_version = htobe32(0); connect.viewer_session_id = -1ULL; /* will be set on recv */ connect.major = htobe32(LTTNG_LIVE_MAJOR); connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - perror("[error] Error sending cmd"); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); - ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending version"); + perror("[error] Error sending cmd for establishing session"); goto error; } - assert(ret_len == sizeof(connect)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect)); if (ret_len == 0) { @@ -244,7 +249,7 @@ void print_session_list(GPtrArray *session_list, const char *path) for (i = 0; i < session_list->len; i++) { relay_session = g_ptr_array_index(session_list, i); - fprintf(stdout, "%s/host/%s/%s (timer = %u, " + fprintf(LTTNG_LIVE_OUTPUT_FP, "%s/host/%s/%s (timer = %u, " "%u stream(s), %u client(s) connected)\n", path, relay_session->hostname, relay_session->name, relay_session->timer, @@ -262,9 +267,9 @@ void update_session_list(GPtrArray *session_list, char *hostname, for (i = 0; i < session_list->len; i++) { relay_session = g_ptr_array_index(session_list, i); - if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) && + if ((strncmp(relay_session->hostname, hostname, MAXNAMLEN) == 0) && strncmp(relay_session->name, - session_name, NAME_MAX) == 0) { + session_name, MAXNAMLEN) == 0) { relay_session->streams += streams; if (relay_session->clients < clients) relay_session->clients = clients; @@ -276,8 +281,8 @@ void update_session_list(GPtrArray *session_list, char *hostname, return; relay_session = g_new0(struct lttng_live_relay_session, 1); - relay_session->hostname = strndup(hostname, NAME_MAX); - relay_session->name = strndup(session_name, NAME_MAX); + relay_session->hostname = bt_strndup(hostname, MAXNAMLEN); + relay_session->name = bt_strndup(session_name, MAXNAMLEN); relay_session->clients = clients; relay_session->streams = streams; relay_session->timer = timer; @@ -305,8 +310,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) } cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); - cmd.data_size = 0; - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); if (ret_len < 0) { @@ -351,8 +356,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) be32toh(lsession.live_timer)); } else { if ((strncmp(lsession.session_name, ctx->session_name, - NAME_MAX) == 0) && (strncmp(lsession.hostname, - ctx->traced_hostname, NAME_MAX) == 0)) { + MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + ctx->traced_hostname, MAXNAMLEN) == 0)) { printf_verbose("Reading from session %" PRIu64 "\n", session_id); g_array_append_val(ctx->session_ids, @@ -381,13 +386,13 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, int ret = 0; trace = g_hash_table_lookup(stream->session->ctf_traces, - (gpointer) ctf_trace_id); + &ctf_trace_id); if (!trace) { trace = g_new0(struct lttng_live_ctf_trace, 1); trace->ctf_trace_id = ctf_trace_id; trace->streams = g_ptr_array_new(); g_hash_table_insert(stream->session->ctf_traces, - (gpointer) ctf_trace_id, + &trace->ctf_trace_id, trace); } if (stream->metadata_flag) @@ -421,6 +426,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) struct lttng_viewer_attach_session_request rq; struct lttng_viewer_attach_session_response rp; struct lttng_viewer_stream stream; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; int ret, i; ssize_t ret_len; @@ -430,8 +437,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) } cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(id); @@ -439,19 +446,20 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - perror("[error] Error sending cmd"); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending attach request"); + perror("[error] Error sending attach command and request"); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp)); if (ret_len == 0) { @@ -498,18 +506,22 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ret = 0; goto end; } - printf_verbose("Waiting for %" PRIu64 " streams:\n", - ctx->session->stream_count); - ctx->session->streams = g_new0(struct lttng_live_viewer_stream, - ctx->session->stream_count); + printf_verbose("Waiting for %d streams:\n", + be32toh(rp.streams_count)); for (i = 0; i < be32toh(rp.streams_count); i++) { - ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream)); + struct lttng_live_viewer_stream *lvstream; + + lvstream = g_new0(struct lttng_live_viewer_stream, 1); + ret_len = lttng_live_recv(ctx->control_sock, &stream, + sizeof(stream)); if (ret_len == 0) { fprintf(stderr, "[error] Remote side has closed connection\n"); + g_free(lvstream); goto error; } if (ret_len < 0) { perror("[error] Error receiving stream"); + g_free(lvstream); goto error; } assert(ret_len == sizeof(stream)); @@ -519,21 +531,23 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) printf_verbose(" stream %" PRIu64 " : %s/%s\n", be64toh(stream.id), stream.path_name, stream.channel_name); - ctx->session->streams[i].id = be64toh(stream.id); - ctx->session->streams[i].session = ctx->session; + lvstream->id = be64toh(stream.id); + lvstream->session = ctx->session; - ctx->session->streams[i].mmap_size = 0; - ctx->session->streams[i].ctf_stream_id = -1ULL; + lvstream->mmap_size = 0; + lvstream->ctf_stream_id = -1ULL; if (be32toh(stream.metadata_flag)) { - ctx->session->streams[i].metadata_flag = 1; + lvstream->metadata_flag = 1; } - ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + ret = lttng_live_ctf_trace_assign(lvstream, be64toh(stream.ctf_trace_id)); if (ret < 0) { + g_free(lvstream); goto error; } - + bt_list_add(&lvstream->stream_node, + &ctx->session->stream_list); } ret = 0; end: @@ -559,10 +573,11 @@ restart: id = g_array_index(ctx->session_ids, uint64_t, i); ret = lttng_live_get_new_streams(ctx, id); printf_verbose("Asking for new streams returns %d\n", ret); + if (lttng_live_should_quit()) { + ret = -1; + goto end; + } if (ret < 0) { - if (lttng_live_should_quit()) { - goto end; - } if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) { printf_verbose("Session %" PRIu64 " closed\n", id); @@ -585,7 +600,12 @@ restart: nb_streams += ret; } } - ret = nb_streams; + if (ctx->session_ids->len == 0) { + /* All sessions are closed. */ + ret = -1; + } else { + ret = nb_streams; + } end: return ret; @@ -599,6 +619,11 @@ int append_metadata(struct lttng_live_ctx *ctx, struct lttng_live_viewer_stream *metadata; char *metadata_buf = NULL; + if (!viewer_stream->ctf_trace->handle) { + printf_verbose("append_metadata: trace handle not ready yet.\n"); + return 0; + } + printf_verbose("get_next_index: new metadata needed\n"); ret = get_new_metadata(ctx, viewer_stream, &metadata_buf); if (ret < 0) { @@ -639,6 +664,8 @@ int get_data_packet(struct lttng_live_ctx *ctx, struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; struct lttng_viewer_trace_packet rp; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; ssize_t ret_len; int ret; @@ -647,28 +674,30 @@ retry: ret = -1; goto end; } + cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->id); rq.offset = htobe64(offset); rq.len = htobe32(len); - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - perror("[error] Error sending cmd"); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending get_data_packet request"); + perror("[error] Error sending get_data_packet cmd and request"); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp)); if (ret_len == 0) { @@ -680,8 +709,8 @@ retry: goto error; } if (ret_len != sizeof(rp)) { - fprintf(stderr, "[error] get_data_packet: expected %" PRId64 - ", received %" PRId64 "\n", sizeof(rp), + fprintf(stderr, "[error] get_data_packet: expected %zu" + ", received %zd\n", sizeof(rp), ret_len); goto error; } @@ -708,11 +737,14 @@ retry: if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { printf_verbose("get_data_packet: new streams needed\n"); ret = ask_new_streams(ctx); - if (ret < 0) + if (ret < 0) { goto error; - else if (ret > 0) - g_hash_table_foreach(ctx->session->ctf_traces, - add_traces, ctx->bt_ctx); + } else if (ret > 0) { + ret = add_traces(ctx); + if (ret < 0) { + goto error; + } + } } if (rp.flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { @@ -789,6 +821,8 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx, struct lttng_viewer_metadata_packet rp; char *data = NULL; ssize_t ret_len; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (lttng_live_should_quit()) { ret = -1; @@ -797,22 +831,23 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx, rq.stream_id = htobe64(metadata_stream->id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - perror("[error] Error sending cmd"); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending get_metadata request"); + perror("[error] Error sending get_metadata cmd and request"); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp)); if (ret_len == 0) { @@ -925,10 +960,15 @@ int get_new_metadata(struct lttng_live_ctx *ctx, if (!len_read) { (void) poll(NULL, 0, ACTIVE_POLL_DELAY); } + if (ret < 0) { + break; /* Stop on error. */ + } } while (ret > 0 || !len_read); - if (fclose(metadata_stream->metadata_fp_write)) - perror("fclose"); + if (babeltrace_close_memstream(metadata_buf, &size, + metadata_stream->metadata_fp_write)) { + perror("babeltrace_close_memstream"); + } metadata_stream->metadata_fp_write = NULL; error: @@ -968,32 +1008,34 @@ int get_next_index(struct lttng_live_ctx *ctx, int ret; ssize_t ret_len; struct lttng_viewer_index *rp = &viewer_stream->current_index; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(viewer_stream->id); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); retry: if (lttng_live_should_quit()) { ret = -1; goto end; } - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(ctx->control_sock, &cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending cmd"); + perror("[error] Error sending get_next_index cmd and request"); goto error; } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - perror("[error] Error sending get_next_index request"); - goto error; - } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp)); if (ret_len == 0) { @@ -1011,6 +1053,13 @@ retry: switch (be32toh(rp->status)) { case LTTNG_VIEWER_INDEX_INACTIVE: printf_verbose("get_next_index: inactive\n"); + + if (index->ts_cycles.timestamp_end == + be64toh(rp->timestamp_end)) { + /* Already seen this timestamp. */ + (void) poll(NULL, 0, ACTIVE_POLL_DELAY); + } + memset(index, 0, sizeof(struct packet_index)); index->ts_cycles.timestamp_end = be64toh(rp->timestamp_end); *stream_id = be64toh(rp->stream_id); @@ -1030,11 +1079,14 @@ retry: if (rp->flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { printf_verbose("get_next_index: need new streams\n"); ret = ask_new_streams(ctx); - if (ret < 0) + if (ret < 0) { goto error; - else if (ret > 0) - g_hash_table_foreach(ctx->session->ctf_traces, - add_traces, ctx->bt_ctx); + } else if (ret > 0) { + ret = add_traces(ctx); + if (ret < 0) { + goto error; + } + } } break; case LTTNG_VIEWER_INDEX_RETRY: @@ -1043,6 +1095,7 @@ retry: goto retry; case LTTNG_VIEWER_INDEX_HUP: printf_verbose("get_next_index: stream hung up\n"); + /* TODO: remove stream from session list and trace ptr array */ viewer_stream->id = -1ULL; index->offset = EOF; ctx->session->stream_count--; @@ -1110,7 +1163,7 @@ int handle_seek_position(size_t index, int whence, struct ctf_stream_pos *pos, struct ctf_file_stream *file_stream) { - int ret; + int ret = 0; switch (whence) { case SEEK_CUR: @@ -1244,14 +1297,30 @@ retry: } if (cur_index->content_size == 0) { + /* Beacon packet index */ if (file_stream->parent.stream_class) { file_stream->parent.cycles_timestamp = cur_index->ts_cycles.timestamp_end; file_stream->parent.real_timestamp = ctf_get_real_timestamp( &file_stream->parent, cur_index->ts_cycles.timestamp_end); + + /* + * Duplicate the data from the previous index, because + * the one we just received is only a beacon with no + * relevant information except the timestamp_end. We + * don't need to keep this timestamp_end because we already + * updated the file_stream timestamps, so we only need + * to keep the last real index data as prev_index. That + * way, we keep the original prev timestamps and + * discarded events counter. This is the same behaviour + * as if we were reading a local trace, we would not + * have fake indexes between real indexes. + */ + memcpy(cur_index, prev_index, sizeof(struct packet_index)); } } else { + /* Real packet index */ if (file_stream->parent.stream_class) { /* Convert the timestamps and append to the real_index. */ cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp( @@ -1265,12 +1334,33 @@ retry: ctf_update_current_packet_index(&file_stream->parent, prev_index, cur_index); + /* + * We need to check if we are in trace read or called + * from packet indexing. In this last case, the + * collection is not there, so we cannot print the + * timestamps. + */ + if ((&file_stream->parent)->stream_class->trace->parent.collection) { + ctf_print_discarded_lost(stderr, &file_stream->parent); + } + file_stream->parent.cycles_timestamp = cur_index->ts_cycles.timestamp_begin; file_stream->parent.real_timestamp = cur_index->ts_real.timestamp_begin; } + /* + * Flush the output between attempts to grab a packet, thus + * ensuring we flush at least at the periodical timer period. + * This ensures the output remains reactive for interactive users and + * that the output is flushed when redirected to a file by the shell. + */ + if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) { + perror("fflush"); + goto end; + } + if (pos->packet_size == 0 || pos->offset == EOF) { goto end; } @@ -1310,8 +1400,8 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) } cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); - cmd.data_size = 0; - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); if (ret_len < 0) { @@ -1359,15 +1449,14 @@ int del_traces(gpointer key, gpointer value, gpointer user_data) } static -void add_traces(gpointer key, gpointer value, gpointer user_data) +int add_one_trace(struct lttng_live_ctx *ctx, + struct lttng_live_ctf_trace *trace) { int i, ret; - struct bt_context *bt_ctx = user_data; - struct lttng_live_ctf_trace *trace = value; + struct bt_context *bt_ctx = ctx->bt_ctx; struct lttng_live_viewer_stream *stream; struct bt_mmap_stream *new_mmap_stream; struct bt_mmap_stream_list mmap_list; - struct lttng_live_ctx *ctx = NULL; struct bt_trace_descriptor *td; struct bt_trace_handle *handle; @@ -1380,14 +1469,15 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) * times the same traces. * If a trace is already in the context, we just skip this function. */ - if (trace->in_use) - return; + if (trace->in_use) { + ret = 0; + goto end; + } BT_INIT_LIST_HEAD(&mmap_list.head); for (i = 0; i < trace->streams->len; i++) { stream = g_ptr_array_index(trace->streams, i); - ctx = stream->session->ctx; if (!stream->metadata_flag) { new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream)); @@ -1423,6 +1513,7 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) if (!trace->metadata_fp) { fprintf(stderr, "[error] No metadata stream opened\n"); + ret = -1; goto end_free; } @@ -1430,6 +1521,7 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) ctf_live_packet_seek, &mmap_list, trace->metadata_fp); if (ret < 0) { fprintf(stderr, "[error] Error adding trace\n"); + ret = -1; goto end_free; } trace->metadata_stream->metadata_len = 0; @@ -1451,7 +1543,31 @@ void add_traces(gpointer key, gpointer value, gpointer user_data) end_free: bt_context_put(bt_ctx); end: - return; + return ret; +} + +static +int add_traces(struct lttng_live_ctx *ctx) +{ + int ret; + struct lttng_live_ctf_trace *trace; + GHashTableIter it; + gpointer key; + gpointer value; + + g_hash_table_iter_init(&it, ctx->session->ctf_traces); + while (g_hash_table_iter_next(&it, &key, &value)) { + trace = (struct lttng_live_ctf_trace *) value; + ret = add_one_trace(ctx, trace); + if (ret < 0) { + goto end; + } + } + + ret = 0; + +end: + return ret; } /* @@ -1467,6 +1583,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) int ret, i, nb_streams = 0; ssize_t ret_len; uint32_t stream_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (lttng_live_should_quit()) { ret = -1; @@ -1474,25 +1592,26 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(id); - ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - perror("[error] Error sending cmd"); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len); if (ret_len < 0) { - perror("[error] Error sending get_new_streams request"); + perror("[error] Error sending get_new_streams cmd and request"); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp)); if (ret_len == 0) { @@ -1534,18 +1653,22 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) ret = 0; goto end; } - printf_verbose("Waiting for %" PRIu64 " streams:\n", - ctx->session->stream_count); - ctx->session->streams = g_new0(struct lttng_live_viewer_stream, - ctx->session->stream_count); + printf_verbose("Waiting for %d streams:\n", stream_count); + for (i = 0; i < stream_count; i++) { - ret_len = lttng_live_recv(ctx->control_sock, &stream, sizeof(stream)); + struct lttng_live_viewer_stream *lvstream; + + lvstream = g_new0(struct lttng_live_viewer_stream, 1); + ret_len = lttng_live_recv(ctx->control_sock, &stream, + sizeof(stream)); if (ret_len == 0) { fprintf(stderr, "[error] Remote side has closed connection\n"); + g_free(lvstream); goto error; } if (ret_len < 0) { perror("[error] Error receiving stream"); + g_free(lvstream); goto error; } assert(ret_len == sizeof(stream)); @@ -1555,22 +1678,24 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) printf_verbose(" stream %" PRIu64 " : %s/%s\n", be64toh(stream.id), stream.path_name, stream.channel_name); - ctx->session->streams[i].id = be64toh(stream.id); - ctx->session->streams[i].session = ctx->session; + lvstream->id = be64toh(stream.id); + lvstream->session = ctx->session; - ctx->session->streams[i].mmap_size = 0; - ctx->session->streams[i].ctf_stream_id = -1ULL; + lvstream->mmap_size = 0; + lvstream->ctf_stream_id = -1ULL; if (be32toh(stream.metadata_flag)) { - ctx->session->streams[i].metadata_flag = 1; + lvstream->metadata_flag = 1; } - ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], + ret = lttng_live_ctf_trace_assign(lvstream, be64toh(stream.ctf_trace_id)); if (ret < 0) { + g_free(lvstream); goto error; } nb_streams++; - + bt_list_add(&lvstream->stream_node, + &ctx->session->stream_list); } ret = nb_streams; end: @@ -1623,7 +1748,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) for (i = 0; i < ctx->session_ids->len; i++) { id = g_array_index(ctx->session_ids, uint64_t, i); - printf_verbose("Attaching to session %lu\n", id); + printf_verbose("Attaching to session %" PRIu64 "\n", id); ret = lttng_live_attach_session(ctx, id); printf_verbose("Attaching session returns %d\n", ret); if (ret < 0) { @@ -1653,6 +1778,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) } ret = ask_new_streams(ctx); if (ret < 0) { + ret = 0; goto end_free; } if (!ctx->session->stream_count) { @@ -1660,8 +1786,10 @@ int lttng_live_read(struct lttng_live_ctx *ctx) } } - g_hash_table_foreach(ctx->session->ctf_traces, add_traces, - ctx->bt_ctx); + ret = add_traces(ctx); + if (ret < 0) { + goto end_free; + } begin_pos.type = BT_SEEK_BEGIN; iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);