X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;ds=sidebyside;f=formats%2Flttng-live%2Flttng-live-functions.c;h=0974b567a657c50314f10d98a477ebc96179a677;hb=6192b9881a890a2fccc029054b312df95cbf5529;hp=34612cf7b59f4e2fc5bca704a962d602b313fdc5;hpb=4a74436710b248274ffe9e038da0bd41af8a3d8d;p=babeltrace.git diff --git a/formats/lttng-live/lttng-live-functions.c b/formats/lttng-live/lttng-live-functions.c index 34612cf7..0974b567 100644 --- a/formats/lttng-live/lttng-live-functions.c +++ b/formats/lttng-live/lttng-live-functions.c @@ -50,7 +50,7 @@ #include #include "lttng-live-functions.h" -#include "lttng-viewer.h" +#include "lttng-viewer-abi.h" /* * Memory allocation zeroed @@ -110,6 +110,7 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx) cmd.data_size = sizeof(connect); cmd.cmd_version = 0; + connect.viewer_session_id = -1ULL; /* will be set on recv */ connect.major = htobe32(LTTNG_LIVE_MAJOR); connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); @@ -201,6 +202,8 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path) goto error; } assert(ret_len == sizeof(lsession)); + lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; + lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, " "%u stream(s), %u client(s) connected)\n", @@ -227,11 +230,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, (gpointer) ctf_trace_id); if (!trace) { trace = g_new0(struct lttng_live_ctf_trace, 1); - if (!trace) { - ret = -1; - fprintf(stderr, "[error] ctf_trace allocation\n"); - goto error; - } trace->ctf_trace_id = ctf_trace_id; trace->streams = g_ptr_array_new(); g_hash_table_insert(stream->session->ctf_traces, @@ -244,7 +242,6 @@ int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream, stream->ctf_trace = trace; g_ptr_array_add(trace->streams, stream); -error: return ret; } @@ -340,11 +337,6 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) ctx->session->stream_count); ctx->session->streams = g_new0(struct lttng_live_viewer_stream, ctx->session->stream_count); - if (!ctx->session->streams) { - ret = -1; - goto error; - } - for (i = 0; i < be32toh(rp.streams_count); i++) { do { ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0); @@ -355,6 +347,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) goto error; } assert(ret_len == sizeof(stream)); + stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; + stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; printf_verbose(" stream %" PRIu64 " : %s/%s\n", be64toh(stream.id), stream.path_name, @@ -369,9 +363,18 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) char *path; path = strdup(LTTNG_METADATA_PATH_TEMPLATE); - path = mkdtemp(path); + if (!path) { + perror("strdup"); + ret = -1; + goto error; + } + if (!mkdtemp(path)) { + perror("mkdtemp"); + free(path); + ret = -1; + goto error; + } ctx->session->streams[i].metadata_flag = 1; - mkdir(path, S_IRWXU | S_IRWXG); snprintf(ctx->session->streams[i].path, sizeof(ctx->session->streams[i].path), "%s/%s", path, @@ -380,9 +383,12 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id) O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); if (ret < 0) { + perror("open"); + free(path); goto error; } ctx->session->streams[i].fd = ret; + free(path); } ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i], be64toh(stream.ctf_trace_id)); @@ -448,7 +454,13 @@ int get_data_packet(struct lttng_live_ctx *ctx, ret = ret_len; goto error; } - assert(ret_len == sizeof(rp)); + if (ret_len != sizeof(rp)) { + fprintf(stderr, "[error] get_data_packet: expected %" PRId64 + ", received %" PRId64 "\n", ret_len, + sizeof(rp)); + ret = -1; + goto error; + } rp.flags = be32toh(rp.flags); @@ -710,7 +722,7 @@ retry: case LTTNG_VIEWER_INDEX_INACTIVE: printf_verbose("get_next_index: inactive\n"); memset(index, 0, sizeof(struct packet_index)); - index->timestamp_end = be64toh(rp.timestamp_end); + index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); break; case LTTNG_VIEWER_INDEX_OK: printf_verbose("get_next_index: Ok, need metadata update : %u\n", @@ -718,8 +730,8 @@ retry: index->offset = be64toh(rp.offset); index->packet_size = be64toh(rp.packet_size); index->content_size = be64toh(rp.content_size); - index->timestamp_begin = be64toh(rp.timestamp_begin); - index->timestamp_end = be64toh(rp.timestamp_end); + index->ts_cycles.timestamp_begin = be64toh(rp.timestamp_begin); + index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); index->events_discarded = be64toh(rp.events_discarded); if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -762,7 +774,7 @@ void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, { struct ctf_stream_pos *pos; struct ctf_file_stream *file_stream; - struct packet_index packet_index; + struct packet_index *prev_index = NULL, *cur_index; struct lttng_live_viewer_stream *viewer_stream; struct lttng_live_session *session; int ret; @@ -773,31 +785,72 @@ retry: viewer_stream = (struct lttng_live_viewer_stream *) pos->priv; session = viewer_stream->session; + switch (pos->packet_index->len) { + case 0: + g_array_set_size(pos->packet_index, 1); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + break; + case 1: + g_array_set_size(pos->packet_index, 2); + prev_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 1); + break; + case 2: + g_array_index(pos->packet_index, + struct packet_index, 0) = + g_array_index(pos->packet_index, + struct packet_index, 1); + prev_index = &g_array_index(pos->packet_index, + struct packet_index, 0); + cur_index = &g_array_index(pos->packet_index, + struct packet_index, 1); + break; + default: + abort(); + break; + } printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id); - ret = get_next_index(session->ctx, viewer_stream, &packet_index); + ret = get_next_index(session->ctx, viewer_stream, cur_index); if (ret < 0) { pos->offset = EOF; fprintf(stderr, "[error] get_next_index failed\n"); return; } - pos->packet_size = packet_index.packet_size; - pos->content_size = packet_index.content_size; + pos->packet_size = cur_index->packet_size; + pos->content_size = cur_index->content_size; pos->mmap_base_offset = 0; - if (packet_index.offset == EOF) { + if (cur_index->offset == EOF) { pos->offset = EOF; } else { pos->offset = 0; } - if (packet_index.content_size == 0) { - file_stream->parent.cycles_timestamp = packet_index.timestamp_end; + if (cur_index->content_size == 0) { + file_stream->parent.cycles_timestamp = + cur_index->ts_cycles.timestamp_end; file_stream->parent.real_timestamp = ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_end); + &file_stream->parent, + cur_index->ts_cycles.timestamp_end); } else { - file_stream->parent.cycles_timestamp = packet_index.timestamp_begin; - file_stream->parent.real_timestamp = ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_begin); + /* Convert the timestamps and append to the real_index. */ + cur_index->ts_real.timestamp_begin = ctf_get_real_timestamp( + &file_stream->parent, + cur_index->ts_cycles.timestamp_begin); + cur_index->ts_real.timestamp_end = ctf_get_real_timestamp( + &file_stream->parent, + cur_index->ts_cycles.timestamp_end); + + ctf_update_current_packet_index(&file_stream->parent, + prev_index, cur_index); + + file_stream->parent.cycles_timestamp = + cur_index->ts_cycles.timestamp_begin; + file_stream->parent.real_timestamp = + cur_index->ts_real.timestamp_begin; } if (pos->packet_size == 0 || pos->offset == EOF) { @@ -807,8 +860,8 @@ retry: printf_verbose("get_data_packet for stream %" PRIu64 "\n", viewer_stream->id); ret = get_data_packet(session->ctx, pos, viewer_stream, - be64toh(packet_index.offset), - packet_index.packet_size / CHAR_BIT); + be64toh(cur_index->offset), + cur_index->packet_size / CHAR_BIT); if (ret == -2) { goto retry; } else if (ret < 0) { @@ -820,8 +873,9 @@ retry: printf_verbose("Index received : packet_size : %" PRIu64 ", offset %" PRIu64 ", content_size %" PRIu64 ", timestamp_end : %" PRIu64 "\n", - packet_index.packet_size, packet_index.offset, - packet_index.content_size, packet_index.timestamp_end); + cur_index->packet_size, cur_index->offset, + cur_index->content_size, + cur_index->ts_cycles.timestamp_end); /* update trace_packet_header and stream_packet_context */ if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) { @@ -918,6 +972,48 @@ end: return; } +int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_create_session_response resp; + int ret; + ssize_t ret_len; + + cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); + cmd.data_size = 0; + cmd.cmd_version = 0; + + do { + ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error sending cmd\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(cmd)); + + do { + ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + fprintf(stderr, "[error] Error receiving create session reply\n"); + ret = ret_len; + goto error; + } + assert(ret_len == sizeof(resp)); + + if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { + fprintf(stderr, "[error] Error creating viewer session\n"); + ret = -1; + goto error; + } + ret = 0; + +error: + return ret; +} + void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) { int ret, active_session = 0; @@ -952,6 +1048,11 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id) if (!sout->parent.event_cb) goto end_free; + ret = lttng_live_create_viewer_session(ctx); + if (ret < 0) { + goto end_free; + } + /* * As long as the session is active, we try to reattach to it, * even if all the streams get closed.