X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=formats%2Flttng-live%2Flttng-live-comm.c;h=0a895b38622b9ddb6cd7554cdaa0c6e30ac30dad;hb=76206d8105706488f2bc84ce64f5a126d87c8219;hp=cbbeef2e77411e0fef0d5e0ac930e5a16bcf7c3c;hpb=bf6c9bd641ef760014da5efd23085f5e93507990;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-comm.c b/formats/lttng-live/lttng-live-comm.c index cbbeef2e..0a895b38 100644 --- a/formats/lttng-live/lttng-live-comm.c +++ b/formats/lttng-live/lttng-live-comm.c @@ -26,13 +26,11 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include @@ -52,6 +50,9 @@ #include #include +#include +#include +#include #include "lttng-live.h" #include "lttng-viewer-abi.h" @@ -103,7 +104,7 @@ ssize_t lttng_live_send(int fd, const void *buf, size_t len) ssize_t ret; do { - ret = send(fd, buf, len, MSG_NOSIGNAL); + ret = bt_send_nosigpipe(fd, buf, len); } while (ret < 0 && errno == EINTR); return ret; } @@ -134,7 +135,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx) server_addr.sin_family = AF_INET; server_addr.sin_port = htons(ctx->port); server_addr.sin_addr = *((struct in_addr *) host->h_addr); - bzero(&(server_addr.sin_zero), 8); + memset(&(server_addr.sin_zero), 0, 8); if (connect(ctx->control_sock, (struct sockaddr *) &server_addr, sizeof(struct sockaddr)) == -1) { @@ -165,8 +166,8 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) } cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); - cmd.data_size = sizeof(connect); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(connect)); + cmd.cmd_version = htobe32(0); connect.viewer_session_id = -1ULL; /* will be set on recv */ connect.major = htobe32(LTTNG_LIVE_MAJOR); @@ -245,7 +246,7 @@ void print_session_list(GPtrArray *session_list, const char *path) for (i = 0; i < session_list->len; i++) { relay_session = g_ptr_array_index(session_list, i); - fprintf(stdout, "%s/host/%s/%s (timer = %u, " + fprintf(LTTNG_LIVE_OUTPUT_FP, "%s/host/%s/%s (timer = %u, " "%u stream(s), %u client(s) connected)\n", path, relay_session->hostname, relay_session->name, relay_session->timer, @@ -263,9 +264,9 @@ void update_session_list(GPtrArray *session_list, char *hostname, for (i = 0; i < session_list->len; i++) { relay_session = g_ptr_array_index(session_list, i); - if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) && + if ((strncmp(relay_session->hostname, hostname, MAXNAMLEN) == 0) && strncmp(relay_session->name, - session_name, NAME_MAX) == 0) { + session_name, MAXNAMLEN) == 0) { relay_session->streams += streams; if (relay_session->clients < clients) relay_session->clients = clients; @@ -277,8 +278,8 @@ void update_session_list(GPtrArray *session_list, char *hostname, return; relay_session = g_new0(struct lttng_live_relay_session, 1); - relay_session->hostname = strndup(hostname, NAME_MAX); - relay_session->name = strndup(session_name, NAME_MAX); + relay_session->hostname = bt_strndup(hostname, MAXNAMLEN); + relay_session->name = bt_strndup(session_name, MAXNAMLEN); relay_session->clients = clients; relay_session->streams = streams; relay_session->timer = timer; @@ -306,8 +307,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) } cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); - cmd.data_size = 0; - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); if (ret_len < 0) { @@ -352,8 +353,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) be32toh(lsession.live_timer)); } else { if ((strncmp(lsession.session_name, ctx->session_name, - NAME_MAX) == 0) && (strncmp(lsession.hostname, - ctx->traced_hostname, NAME_MAX) == 0)) { + MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + ctx->traced_hostname, MAXNAMLEN) == 0)) { printf_verbose("Reading from session %" PRIu64 "\n", session_id); g_array_append_val(ctx->session_ids, @@ -382,13 +383,13 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, int ret = 0; trace = g_hash_table_lookup(stream->session->ctf_traces, - (gpointer) ctf_trace_id); + &ctf_trace_id); if (!trace) { trace = g_new0(struct lttng_live_ctf_trace, 1); trace->ctf_trace_id = ctf_trace_id; trace->streams = g_ptr_array_new(); g_hash_table_insert(stream->session->ctf_traces, - (gpointer) ctf_trace_id, + &trace->ctf_trace_id, trace); } if (stream->metadata_flag) @@ -431,8 +432,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) } cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(id); @@ -648,9 +649,10 @@ retry: ret = -1; goto end; } + cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->id); @@ -681,8 +683,8 @@ retry: goto error; } if (ret_len != sizeof(rp)) { - fprintf(stderr, "[error] get_data_packet: expected %" PRId64 - ", received %" PRId64 "\n", sizeof(rp), + fprintf(stderr, "[error] get_data_packet: expected %zu" + ", received %zd\n", sizeof(rp), ret_len); goto error; } @@ -801,8 +803,8 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx, rq.stream_id = htobe64(metadata_stream->id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); if (ret_len < 0) { @@ -931,8 +933,10 @@ int get_new_metadata(struct lttng_live_ctx *ctx, } } while (ret > 0 || !len_read); - if (fclose(metadata_stream->metadata_fp_write)) - perror("fclose"); + if (babeltrace_close_memstream(metadata_buf, &size, + metadata_stream->metadata_fp_write)) { + perror("babeltrace_close_memstream"); + } metadata_stream->metadata_fp_write = NULL; error: @@ -974,8 +978,8 @@ int get_next_index(struct lttng_live_ctx *ctx, struct lttng_viewer_index *rp = &viewer_stream->current_index; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(viewer_stream->id); @@ -1117,7 +1121,7 @@ int handle_seek_position(size_t index, int whence, struct ctf_stream_pos *pos, struct ctf_file_stream *file_stream) { - int ret; + int ret = 0; switch (whence) { case SEEK_CUR: @@ -1278,6 +1282,17 @@ retry: cur_index->ts_real.timestamp_begin; } + /* + * Flush the output between attempts to grab a packet, thus + * ensuring we flush at least at the periodical timer period. + * This ensures the output remains reactive for interactive users and + * that the output is flushed when redirected to a file by the shell. + */ + if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) { + perror("fflush"); + goto end; + } + if (pos->packet_size == 0 || pos->offset == EOF) { goto end; } @@ -1317,8 +1332,8 @@ int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) } cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); - cmd.data_size = 0; - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd)); if (ret_len < 0) { @@ -1507,8 +1522,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id) } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(id); @@ -1631,7 +1646,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) goto end; } - fmt_write = bt_lookup_format(g_quark_from_static_string("text")); + fmt_write = bt_lookup_format(g_quark_from_string("text")); if (!fmt_write) { fprintf(stderr, "[error] ctf-text error\n"); goto end; @@ -1656,7 +1671,7 @@ int lttng_live_read(struct lttng_live_ctx *ctx) for (i = 0; i < ctx->session_ids->len; i++) { id = g_array_index(ctx->session_ids, uint64_t, i); - printf_verbose("Attaching to session %lu\n", id); + printf_verbose("Attaching to session %" PRIu64 "\n", id); ret = lttng_live_attach_session(ctx, id); printf_verbose("Attaching session returns %d\n", ret); if (ret < 0) {