From: Mathieu Desnoyers Date: Fri, 25 Nov 2016 21:25:14 +0000 (-0500) Subject: Implement ctf.lttng-live component X-Git-Tag: v2.0.0-pre1~278 X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=7cdc2bab17acd56d035b204518ef845fa5a9f1c7 Implement ctf.lttng-live component Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- diff --git a/cli/babeltrace.c b/cli/babeltrace.c index f42b563e..03d65891 100644 --- a/cli/babeltrace.c +++ b/cli/babeltrace.c @@ -48,6 +48,7 @@ #include #include #include +#include #include "babeltrace-cfg.h" #include "babeltrace-cfg-cli-args.h" #include "babeltrace-cfg-cli-args-default.h" diff --git a/include/babeltrace/graph/notification-inactivity.h b/include/babeltrace/graph/notification-inactivity.h index 6c8d0849..9ddf51da 100644 --- a/include/babeltrace/graph/notification-inactivity.h +++ b/include/babeltrace/graph/notification-inactivity.h @@ -29,6 +29,7 @@ extern "C" { struct bt_notification; struct bt_clock_class_priority_map; +struct bt_ctf_clock_class; extern struct bt_notification *bt_notification_inactivity_create( struct bt_clock_class_priority_map *clock_class_priority_map); diff --git a/include/babeltrace/graph/notification-iterator.h b/include/babeltrace/graph/notification-iterator.h index 753ff6ec..35f68a83 100644 --- a/include/babeltrace/graph/notification-iterator.h +++ b/include/babeltrace/graph/notification-iterator.h @@ -40,7 +40,7 @@ struct bt_notification_iterator; * Status code. Errors are always negative. */ enum bt_notification_iterator_status { - /** Try again. */ + /** No notifications available for now. Try again later. */ BT_NOTIFICATION_ITERATOR_STATUS_AGAIN = 11, /** No more notifications to be delivered. */ BT_NOTIFICATION_ITERATOR_STATUS_END = 1, diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 36098200..1b1862bd 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -1127,7 +1127,7 @@ int enqueue_notification_and_automatic( break; case BT_NOTIFICATION_TYPE_INACTIVITY: /* Always valid */ - break; + goto handle_notif; default: /* * Invalid type of notification. Only the notification @@ -1155,6 +1155,7 @@ int enqueue_notification_and_automatic( goto error; } +handle_notif: switch (notif->type) { case BT_NOTIFICATION_TYPE_EVENT: ret = handle_notif_event(iterator, notif, notif_stream, @@ -1174,6 +1175,9 @@ int enqueue_notification_and_automatic( ret = handle_notif_packet_end(iterator, notif, notif_stream, notif_packet); break; + case BT_NOTIFICATION_TYPE_INACTIVITY: + add_action_push_notif(iterator, notif); + break; default: break; } diff --git a/plugins/ctf/common/metadata/decoder.c b/plugins/ctf/common/metadata/decoder.c index a36a0860..41badbb6 100644 --- a/plugins/ctf/common/metadata/decoder.c +++ b/plugins/ctf/common/metadata/decoder.c @@ -321,6 +321,11 @@ enum ctf_metadata_decoder_status ctf_metadata_decoder_decode( goto end; } + if (strlen(buf) == 0) { + /* An empty metadata packet is OK. */ + goto end; + } + /* Convert the real file pointer to a memory file pointer */ fp = bt_fmemopen(buf, strlen(buf), "rb"); close_fp = true; diff --git a/plugins/ctf/common/notif-iter/notif-iter.c b/plugins/ctf/common/notif-iter/notif-iter.c index 496f95a4..bc20f881 100644 --- a/plugins/ctf/common/notif-iter/notif-iter.c +++ b/plugins/ctf/common/notif-iter/notif-iter.c @@ -2386,6 +2386,10 @@ enum bt_ctf_notif_iter_status bt_ctf_notif_iter_get_next_notification( while (true) { status = handle_state(notit); + if (status == BT_CTF_NOTIF_ITER_STATUS_AGAIN) { + PDBG("Medium operation reported \"try again later\""); + goto end; + } if (status != BT_CTF_NOTIF_ITER_STATUS_OK) { if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) { PDBG("Medium operation reported end of stream\n"); diff --git a/plugins/ctf/fs-sink/writer.c b/plugins/ctf/fs-sink/writer.c index 211a6a9c..9fca5de7 100644 --- a/plugins/ctf/fs-sink/writer.c +++ b/plugins/ctf/fs-sink/writer.c @@ -217,40 +217,35 @@ enum bt_component_status writer_run(struct bt_private_component *component) struct bt_notification_iterator *it; struct writer_component *writer_component = bt_private_component_get_user_data(component); - - it = writer_component->input_iterator; - assert(it); + enum bt_notification_iterator_status it_ret; if (unlikely(writer_component->error)) { ret = BT_COMPONENT_STATUS_ERROR; goto end; } - if (likely(writer_component->processed_first_event)) { - enum bt_notification_iterator_status it_ret; - - it_ret = bt_notification_iterator_next(it); - switch (it_ret) { - case BT_NOTIFICATION_ITERATOR_STATUS_ERROR: - ret = BT_COMPONENT_STATUS_ERROR; - goto end; - case BT_NOTIFICATION_ITERATOR_STATUS_END: - ret = BT_COMPONENT_STATUS_END; - BT_PUT(writer_component->input_iterator); - goto end; - default: - break; - } - } + it = writer_component->input_iterator; + assert(it); + it_ret = bt_notification_iterator_next(it); - notification = bt_notification_iterator_get_notification(it); - if (!notification) { + switch (it_ret) { + case BT_NOTIFICATION_ITERATOR_STATUS_END: + ret = BT_COMPONENT_STATUS_END; + BT_PUT(writer_component->input_iterator); + goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + ret = BT_COMPONENT_STATUS_AGAIN; + goto end; + case BT_NOTIFICATION_ITERATOR_STATUS_OK: + break; + default: ret = BT_COMPONENT_STATUS_ERROR; goto end; } + notification = bt_notification_iterator_get_notification(it); + assert(notification); ret = handle_notification(writer_component, notification); - writer_component->processed_first_event = true; end: bt_put(notification); return ret; diff --git a/plugins/ctf/lttng-live/Makefile.am b/plugins/ctf/lttng-live/Makefile.am index 3e72abbf..dcd310ee 100644 --- a/plugins/ctf/lttng-live/Makefile.am +++ b/plugins/ctf/lttng-live/Makefile.am @@ -1,8 +1,9 @@ AM_CFLAGS = $(PACKAGE_CFLAGS) -I$(top_srcdir)/include -I$(top_srcdir)/plugins -libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = \ - lttng-live.c \ - lttng-live-internal.h +libbabeltrace_plugin_ctf_lttng_live_la_SOURCES = lttng-live.c \ + data-stream.c lttng-live-internal.h \ + data-stream.h metadata.c metadata.h \ + viewer-connection.c viewer-connection.h \ + lttng-viewer-abi.h noinst_LTLIBRARIES = libbabeltrace-plugin-ctf-lttng-live.la - diff --git a/plugins/ctf/lttng-live/data-stream.c b/plugins/ctf/lttng-live/data-stream.c new file mode 100644 index 00000000..6412e3d0 --- /dev/null +++ b/plugins/ctf/lttng-live/data-stream.c @@ -0,0 +1,226 @@ +/* + * Copyright 2016 - Philippe Proulx + * Copyright 2016 - Jérémie Galarneau + * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "../common/notif-iter/notif-iter.h" +#include +#include "data-stream.h" + +#define PRINT_ERR_STREAM lttng_live->error_fp +#define PRINT_PREFIX "lttng-live-data-stream" +#define PRINT_DBG_CHECK lttng_live_debug +#include "../print.h" + +static +enum bt_ctf_notif_iter_medium_status medop_request_bytes( + size_t request_sz, uint8_t **buffer_addr, + size_t *buffer_sz, void *data) +{ + enum bt_ctf_notif_iter_medium_status status = + BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + struct lttng_live_stream_iterator *stream = data; + struct lttng_live_trace *trace = stream->trace; + struct lttng_live_session *session = trace->session; + struct lttng_live_component *lttng_live = session->lttng_live; + uint64_t recv_len = 0; + uint64_t len_left; + uint64_t read_len; + //int i; + + len_left = stream->base_offset + stream->len - stream->offset; + if (!len_left) { + stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + return status; + } + read_len = MIN(request_sz, stream->buflen); + read_len = MIN(read_len, len_left); + status = lttng_live_get_stream_bytes(lttng_live, + stream, stream->buf, stream->offset, + read_len, &recv_len); +#if 0 //DEBUG + for (i = 0; i < recv_len; i++) { + fprintf(stderr, "%x ", stream->buf[i]); + } + fprintf(stderr, "\n"); +#endif + *buffer_addr = stream->buf; + *buffer_sz = recv_len; + stream->offset += recv_len; + return status; +} + +static +struct bt_ctf_stream *medop_get_stream( + struct bt_ctf_stream_class *stream_class, void *data) +{ + struct lttng_live_stream_iterator *lttng_live_stream = data; + struct lttng_live_trace *trace = lttng_live_stream->trace; + struct lttng_live_session *session = trace->session; + struct lttng_live_component *lttng_live = session->lttng_live; + + if (!lttng_live_stream->stream) { + int64_t id = bt_ctf_stream_class_get_id(stream_class); + + PDBG("Creating stream %s out of stream class %" PRId64 "\n", + lttng_live_stream->name, id); + lttng_live_stream->stream = bt_ctf_stream_create(stream_class, + lttng_live_stream->name); + if (!lttng_live_stream->stream) { + PERR("Cannot create stream %s (stream class %" PRId64 ")\n", + lttng_live_stream->name, id); + } + } + + return lttng_live_stream->stream; +} + +static struct bt_ctf_notif_iter_medium_ops medops = { + .request_bytes = medop_request_bytes, + .get_stream = medop_get_stream, +}; + +BT_HIDDEN +enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init( + struct lttng_live_session *session) +{ + struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_trace *trace; + + if (!session->lazy_stream_notif_init) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + } + + bt_list_for_each_entry(trace, &session->traces, node) { + struct lttng_live_stream_iterator *stream; + + bt_list_for_each_entry(stream, &trace->streams, node) { + if (stream->notif_iter) { + continue; + } + stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + lttng_live->max_query_size, medops, + stream, lttng_live->error_fp); + if (!stream->notif_iter) { + goto error; + } + } + } + + session->lazy_stream_notif_init = false; + + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + +error: + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; +} + +BT_HIDDEN +struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( + struct lttng_live_session *session, + uint64_t ctf_trace_id, + uint64_t stream_id) +{ + struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_stream_iterator *stream = + g_new0(struct lttng_live_stream_iterator, 1); + struct lttng_live_trace *trace; + int ret; + + trace = lttng_live_ref_trace(session, ctf_trace_id); + if (!trace) { + goto error; + } + + stream->p.type = LIVE_STREAM_TYPE_STREAM; + stream->trace = trace; + stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + stream->viewer_stream_id = stream_id; + stream->ctf_stream_class_id = -1ULL; + stream->last_returned_inactivity_timestamp = INT64_MIN; + + if (trace->trace) { + stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + lttng_live->max_query_size, medops, + stream, lttng_live->error_fp); + if (!stream->notif_iter) { + goto error; + } + } + stream->buf = g_new0(uint8_t, session->lttng_live->max_query_size); + stream->buflen = session->lttng_live->max_query_size; + + ret = lttng_live_add_port(lttng_live, stream); + assert(!ret); + + bt_list_add(&stream->node, &trace->streams); + + goto end; +error: + /* Do not touch "borrowed" file. */ + lttng_live_stream_iterator_destroy(stream); + stream = NULL; +end: + return stream; +} + +BT_HIDDEN +void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream) +{ + struct lttng_live_component *lttng_live; + int ret; + + if (!stream) { + return; + } + + lttng_live = stream->trace->session->lttng_live; + ret = lttng_live_remove_port(lttng_live, stream->port); + assert(!ret); + + if (stream->stream) { + BT_PUT(stream->stream); + } + + if (stream->notif_iter) { + bt_ctf_notif_iter_destroy(stream->notif_iter); + } + g_free(stream->buf); + BT_PUT(stream->packet_end_notif_queue); + bt_list_del(&stream->node); + /* + * Ensure we poke the trace metadata in the future, which is + * required to release the metadata reference on the trace. + */ + stream->trace->new_metadata_needed = true; + lttng_live_unref_trace(stream->trace); + g_free(stream); +} diff --git a/plugins/ctf/lttng-live/data-stream.h b/plugins/ctf/lttng-live/data-stream.h new file mode 100644 index 00000000..73e588cc --- /dev/null +++ b/plugins/ctf/lttng-live/data-stream.h @@ -0,0 +1,44 @@ +#ifndef LTTNG_LIVE_DATA_STREAM_H +#define LTTNG_LIVE_DATA_STREAM_H + +/* + * Copyright 2016 - Philippe Proulx + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include + +#include "lttng-live-internal.h" +#include "../common/notif-iter/notif-iter.h" + +enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init( + struct lttng_live_session *session); + +struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( + struct lttng_live_session *session, + uint64_t ctf_trace_id, + uint64_t stream_id); + +void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream); + +#endif /* LTTNG_LIVE_DATA_STREAM_H */ diff --git a/plugins/ctf/lttng-live/lttng-live-internal.h b/plugins/ctf/lttng-live/lttng-live-internal.h index 51556274..c55d999a 100644 --- a/plugins/ctf/lttng-live/lttng-live-internal.h +++ b/plugins/ctf/lttng-live/lttng-live-internal.h @@ -5,6 +5,7 @@ * BabelTrace - LTTng-live client Component * * Copyright 2016 Jérémie Galarneau + * Copyright 2016 Mathieu Desnoyers * * Author: Jérémie Galarneau * @@ -27,15 +28,234 @@ * SOFTWARE. */ +#include + #include #include +#include +#include +#include "viewer-connection.h" + +//TODO: this should not be used by plugins. Should copy code into plugin +//instead. +#include "babeltrace/object-internal.h" +#include "babeltrace/list-internal.h" +#include "../common/metadata/decoder.h" + +#define STREAM_NAME_PREFIX "stream-" +/* Account for u64 max string length. */ +#define U64_STR_MAX_LEN 20 +#define STREAM_NAME_MAX_LEN (sizeof(STREAM_NAME_PREFIX) + U64_STR_MAX_LEN) + +extern bool lttng_live_debug; + +struct lttng_live_component; +struct lttng_live_session; + +enum lttng_live_stream_state { + LTTNG_LIVE_STREAM_ACTIVE_NO_DATA, + LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA, + LTTNG_LIVE_STREAM_QUIESCENT, + LTTNG_LIVE_STREAM_ACTIVE_DATA, + LTTNG_LIVE_STREAM_EOF, +}; + +enum live_stream_type { + LIVE_STREAM_TYPE_NO_STREAM, + LIVE_STREAM_TYPE_STREAM, +}; + +struct lttng_live_stream_iterator_generic { + enum live_stream_type type; +}; + +/* Iterator over a live stream. */ +struct lttng_live_stream_iterator { + struct lttng_live_stream_iterator_generic p; + + struct bt_ctf_stream *stream; + struct lttng_live_trace *trace; + struct bt_private_port *port; + + /* Node of stream list within the trace. */ + struct bt_list_head node; + + /* + * Since only a single iterator per viewer connection, we have + * only a single notification iterator per stream. + */ + struct bt_ctf_notif_iter *notif_iter; + + uint64_t viewer_stream_id; + + uint64_t ctf_stream_class_id; + uint64_t base_offset; /* base offset in current index. */ + uint64_t len; /* len to read in current index. */ + uint64_t offset; /* offset in current index. */ + + int64_t last_returned_inactivity_timestamp; + int64_t current_inactivity_timestamp; + + enum lttng_live_stream_state state; + + uint64_t current_packet_end_timestamp; + struct bt_notification *packet_end_notif_queue; + + uint8_t *buf; + size_t buflen; + + char name[STREAM_NAME_MAX_LEN]; +}; + +struct lttng_live_no_stream_iterator { + struct lttng_live_stream_iterator_generic p; + + struct lttng_live_component *lttng_live; + struct bt_private_port *port; +}; + +struct lttng_live_component_options { + bool opt_dummy : 1; +}; + +struct lttng_live_metadata { + struct lttng_live_trace *trace; + uint64_t stream_id; + uint8_t uuid[16]; + bool is_uuid_set; + int bo; + char *text; + + struct ctf_metadata_decoder *decoder; + + bool closed; +}; + +struct lttng_live_trace { + struct bt_object obj; + + /* Node of trace list within the session. */ + struct bt_list_head node; + + /* Back reference to session. */ + struct lttng_live_session *session; + + uint64_t id; /* ctf trace ID within the session. */ + + struct bt_ctf_trace *trace; + + struct lttng_live_metadata *metadata; + struct bt_clock_class_priority_map *cc_prio_map; + + /* List of struct lttng_live_stream_iterator */ + struct bt_list_head streams; + + bool new_metadata_needed; +}; + +struct lttng_live_session { + /* Node of session list within the component. */ + struct bt_list_head node; + + struct lttng_live_component *lttng_live; + + uint64_t id; + + /* List of struct lttng_live_trace */ + struct bt_list_head traces; + + bool attached; + bool new_streams_needed; + bool lazy_stream_notif_init; + bool closed; +}; + +/* + * A component instance is an iterator on a single session. + */ +struct lttng_live_component { + struct bt_object obj; + struct bt_private_component *private_component; /* weak */ + struct bt_live_viewer_connection *viewer_connection; + + /* List of struct lttng_live_session */ + struct bt_list_head sessions; + + GString *url; + FILE *error_fp; + size_t max_query_size; + struct lttng_live_component_options options; + + struct bt_private_port *no_stream_port; + struct lttng_live_no_stream_iterator *no_stream_iter; +}; + +enum bt_ctf_lttng_live_iterator_status { + /** Iterator state has progressed. Continue iteration immediately. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE = 3, + /** No notification available for now. Try again later. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN = 2, + /** No more CTF_LTTNG_LIVEs to be delivered. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END = 1, + /** No error, okay. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK = 0, + /** Invalid arguments. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL = -1, + /** General error. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR = -2, + /** Out of memory. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM = -3, + /** Unsupported iterator feature. */ + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED = -4, +}; BT_HIDDEN -enum bt_component_status lttng_live_init(struct bt_private_component *source, +enum bt_component_status lttng_live_component_init(struct bt_private_component *source, struct bt_value *params, void *init_method_data); +struct bt_value *lttng_live_query(struct bt_component_class *comp_class, + const char *object, struct bt_value *params); + +void lttng_live_component_finalize(struct bt_private_component *component); + BT_HIDDEN struct bt_notification_iterator_next_return lttng_live_iterator_next( struct bt_private_notification_iterator *iterator); + +enum bt_notification_iterator_status lttng_live_iterator_init( + struct bt_private_notification_iterator *it, + struct bt_private_port *port); + +void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it); + +int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live); +int lttng_live_attach_session(struct lttng_live_session *session); +int lttng_live_detach_session(struct lttng_live_session *session); +enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( + struct lttng_live_session *session); + +int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id); + +ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, + FILE *fp); +enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream, + struct packet_index *index); +enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, + uint64_t req_len, uint64_t *recv_len); + +int lttng_live_add_port(struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream_iter); +int lttng_live_remove_port(struct lttng_live_component *lttng_live, + struct bt_private_port *port); + +struct lttng_live_trace *lttng_live_ref_trace( + struct lttng_live_session *session, uint64_t trace_id); +void lttng_live_unref_trace(struct lttng_live_trace *trace); +void lttng_live_need_new_streams(struct lttng_live_component *lttng_live); + #endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_INTERNAL_H */ diff --git a/plugins/ctf/lttng-live/lttng-live.c b/plugins/ctf/lttng-live/lttng-live.c index 2d996abb..9cc2f879 100644 --- a/plugins/ctf/lttng-live/lttng-live.c +++ b/plugins/ctf/lttng-live/lttng-live.c @@ -4,6 +4,7 @@ * Babeltrace CTF LTTng-live Client Component * * Copyright 2016 Jérémie Galarneau + * Copyright 2016 Mathieu Desnoyers * * Author: Jérémie Galarneau * @@ -26,31 +27,1051 @@ * SOFTWARE. */ -#include "lttng-live-internal.h" +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include "lttng-live-internal.h" +#include "data-stream.h" +#include "metadata.h" + +#define PRINT_ERR_STREAM (lttng_live->error_fp) +#define PRINT_PREFIX "lttng-live" +#define PRINT_DBG_CHECK lttng_live_debug +#define MAX_QUERY_SIZE (256*1024) +#include "../print.h" + +#ifdef LIVE_DEBUG +#define print_dbg(fmt, args...) \ + fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \ + __func__, __LINE__, ## args) + +static const char *print_state(struct lttng_live_stream_iterator *s) +{ + switch (s->state) { + case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: + return "ACTIVE_NO_DATA"; + case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: + return "QUIESCENT_NO_DATA"; + case LTTNG_LIVE_STREAM_QUIESCENT: + return "QUIESCENT"; + case LTTNG_LIVE_STREAM_ACTIVE_DATA: + return "ACTIVE_DATA"; + case LTTNG_LIVE_STREAM_EOF: + return "EOF"; + default: + return "ERROR"; + } +} +#else +#define print_dbg(fmt, args...) +#endif + +#define print_stream_state(stream) \ + print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \ + bt_port_get_name(bt_port_from_private_port(stream->port)), \ + print_state(stream), stream->last_returned_inactivity_timestamp, \ + stream->current_inactivity_timestamp) + BT_HIDDEN -struct bt_notification *lttng_live_iterator_get( - struct bt_private_notification_iterator *iterator) +bool lttng_live_debug; + +BT_HIDDEN +int lttng_live_add_port(struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream_iter) +{ + int ret; + struct bt_private_port *private_port; + char name[STREAM_NAME_MAX_LEN]; + + ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id); + assert(ret > 0); + strcpy(stream_iter->name, name); + private_port = bt_private_component_source_add_output_private_port( + lttng_live->private_component, name, stream_iter); + if (!private_port) { + return -1; + } + PDBG("Added port %s\n", name); + + if (lttng_live->no_stream_port) { + ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); + if (ret) { + return -1; + } + BT_PUT(lttng_live->no_stream_port); + lttng_live->no_stream_iter->port = NULL; + } + stream_iter->port = private_port; + return 0; +} + +BT_HIDDEN +int lttng_live_remove_port(struct lttng_live_component *lttng_live, + struct bt_private_port *port) +{ + struct bt_component *component; + int64_t nr_ports; + int ret; + + component = bt_component_from_private_component(lttng_live->private_component); + nr_ports = bt_component_source_get_output_port_count(component); + if (nr_ports < 0) { + return -1; + } + BT_PUT(component); + if (nr_ports == 1) { + assert(!lttng_live->no_stream_port); + lttng_live->no_stream_port = + bt_private_component_source_add_output_private_port(lttng_live->private_component, + "no-stream", lttng_live->no_stream_iter); + if (!lttng_live->no_stream_port) { + return -1; + } + lttng_live->no_stream_iter->port = lttng_live->no_stream_port; + } + ret = bt_private_port_remove_from_component(port); + if (ret) { + return -1; + } + return 0; +} + +static +struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session, + uint64_t trace_id) { + struct lttng_live_trace *trace; + + bt_list_for_each_entry(trace, &session->traces, node) { + if (trace->id == trace_id) { + return trace; + } + } return NULL; } +static +void lttng_live_destroy_trace(struct bt_object *obj) +{ + struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj); + + PDBG("Destroy trace\n"); + assert(bt_list_empty(&trace->streams)); + bt_list_del(&trace->node); + lttng_live_metadata_fini(trace); + BT_PUT(trace->cc_prio_map); + g_free(trace); +} + +static +struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session, + uint64_t trace_id) +{ + struct lttng_live_trace *trace = NULL; + + trace = g_new0(struct lttng_live_trace, 1); + if (!trace) { + goto error; + } + trace->session = session; + trace->id = trace_id; + BT_INIT_LIST_HEAD(&trace->streams); + trace->new_metadata_needed = true; + bt_list_add(&trace->node, &session->traces); + bt_object_init(&trace->obj, lttng_live_destroy_trace); + goto end; +error: + g_free(trace); + trace = NULL; +end: + return trace; +} + +BT_HIDDEN +struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session, + uint64_t trace_id) +{ + struct lttng_live_trace *trace; + + trace = lttng_live_find_trace(session, trace_id); + if (trace) { + bt_get(trace); + return trace; + } + return lttng_live_create_trace(session, trace_id); +} + +BT_HIDDEN +void lttng_live_unref_trace(struct lttng_live_trace *trace) +{ + bt_put(trace); +} + +static +void lttng_live_close_trace_streams(struct lttng_live_trace *trace) +{ + struct lttng_live_stream_iterator *stream, *s; + + bt_list_for_each_entry_safe(stream, s, &trace->streams, node) { + lttng_live_stream_iterator_destroy(stream); + } + lttng_live_metadata_fini(trace); +} + +BT_HIDDEN +int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id) +{ + int ret = 0; + struct lttng_live_session *s; + + s = g_new0(struct lttng_live_session, 1); + if (!s) { + goto error; + } + + s->id = session_id; + BT_INIT_LIST_HEAD(&s->traces); + s->lttng_live = lttng_live; + s->new_streams_needed = true; + + PDBG("Reading from session %" PRIu64 "\n", s->id); + bt_list_add(&s->node, <tng_live->sessions); + goto end; +error: + PERR("Error adding session\n"); + g_free(s); + ret = -1; +end: + return ret; +} + +static +void lttng_live_destroy_session(struct lttng_live_session *session) +{ + struct lttng_live_trace *trace, *t; + + PDBG("Destroy session\n"); + if (session->id != -1ULL) { + if (lttng_live_detach_session(session)) { + /* Old relayd cannot detach sessions. */ + PDBG("Unable to detach session %" PRIu64 "\n", + session->id); + } + session->id = -1ULL; + } + bt_list_for_each_entry_safe(trace, t, &session->traces, node) { + lttng_live_close_trace_streams(trace); + } + bt_list_del(&session->node); + g_free(session); +} + +BT_HIDDEN +void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it) +{ + struct lttng_live_stream_iterator_generic *s = + bt_private_notification_iterator_get_user_data(it); + + switch (s->type) { + case LIVE_STREAM_TYPE_NO_STREAM: + { + /* Leave no_stream_iter in place when port is removed. */ + break; + } + case LIVE_STREAM_TYPE_STREAM: + { + struct lttng_live_stream_iterator *stream_iter = + container_of(s, struct lttng_live_stream_iterator, p); + + lttng_live_stream_iterator_destroy(stream_iter); + break; + } + } +} + +static +enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *lttng_live_stream) +{ + switch (lttng_live_stream->state) { + case LTTNG_LIVE_STREAM_QUIESCENT: + case LTTNG_LIVE_STREAM_ACTIVE_DATA: + break; + case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: + /* Invalid state. */ + PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n"); + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: + /* Invalid state. */ + PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n"); + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + case LTTNG_LIVE_STREAM_EOF: + break; + } + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; +} + +/* + * For active no data stream, fetch next data. It can be either: + * - quiescent: need to put it in the prio heap at quiescent end + * timestamp, + * - have data: need to wire up first event into the prio heap, + * - have no data on this stream at this point: need to retry (AGAIN) or + * return EOF. + */ +static +enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *lttng_live_stream) +{ + enum bt_ctf_lttng_live_iterator_status ret = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + struct packet_index index; + enum lttng_live_stream_state orig_state = lttng_live_stream->state; + + if (lttng_live_stream->trace->new_metadata_needed) { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; + } + if (lttng_live_stream->trace->session->new_streams_needed) { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; + } + if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA + && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) { + goto end; + } + ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index); + if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); + if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { + if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA + && lttng_live_stream->last_returned_inactivity_timestamp == + lttng_live_stream->current_inactivity_timestamp) { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + print_stream_state(lttng_live_stream); + } else { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + } + goto end; + } + lttng_live_stream->base_offset = index.offset; + lttng_live_stream->offset = index.offset; + lttng_live_stream->len = index.packet_size / CHAR_BIT; +end: + if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + ret = lttng_live_iterator_next_check_stream_state( + lttng_live, lttng_live_stream); + } + return ret; +} + +/* + * Creation of the notification requires the ctf trace to be created + * beforehand, but the live protocol gives us all streams (including + * metadata) at once. So we split it in three steps: getting streams, + * getting metadata (which creates the ctf trace), and then creating the + * per-stream notifications. + */ +static +enum bt_ctf_lttng_live_iterator_status lttng_live_get_session( + struct lttng_live_component *lttng_live, + struct lttng_live_session *session) +{ + enum bt_ctf_lttng_live_iterator_status status; + struct lttng_live_trace *trace, *t; + + if (lttng_live_attach_session(session)) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } + status = lttng_live_get_new_streams(session); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && + status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) { + return status; + } + bt_list_for_each_entry_safe(trace, t, &session->traces, node) { + status = lttng_live_metadata_update(trace); + if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) { + int retval; + + retval = bt_ctf_trace_set_is_static(trace->trace); + assert(!retval); + } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + return status; + } + } + return lttng_live_lazy_notif_init(session); +} + +BT_HIDDEN +void lttng_live_need_new_streams(struct lttng_live_component *lttng_live) +{ + struct lttng_live_session *session; + + bt_list_for_each_entry(session, <tng_live->sessions, node) { + session->new_streams_needed = true; + } +} + +static +void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live) +{ + struct lttng_live_session *session; + + bt_list_for_each_entry(session, <tng_live->sessions, node) { + struct lttng_live_trace *trace; + + session->new_streams_needed = true; + bt_list_for_each_entry(trace, &session->traces, node) { + trace->new_metadata_needed = true; + } + } +} + +static +enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( + struct lttng_live_component *lttng_live) +{ + enum bt_ctf_lttng_live_iterator_status ret = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + unsigned int nr_sessions_opened = 0; + struct lttng_live_session *session, *s; + + bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { + if (session->closed && bt_list_empty(&session->traces)) { + lttng_live_destroy_session(session); + } + } + /* + * Currently, when there are no sessions, we quit immediately. + * We may want to add a component parameter to keep trying until + * we get data in the future. + * Also, in a remotely distant future, we could add a "new + * session" flag to the protocol, which would tell us that we + * need to query for new sessions even though we have sessions + * currently ongoing. + */ + if (bt_list_empty(<tng_live->sessions)) { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } + bt_list_for_each_entry(session, <tng_live->sessions, node) { + ret = lttng_live_get_session(lttng_live, session); + switch (ret) { + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + break; + default: + goto end; + } + if (!session->closed) { + nr_sessions_opened++; + } + } +end: + if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) { + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + } + return ret; +} + +static +enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *lttng_live_stream, + struct bt_notification **notification, + uint64_t timestamp) +{ + enum bt_ctf_lttng_live_iterator_status ret = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + struct lttng_live_trace *trace; + struct bt_ctf_clock_class *clock_class = NULL; + struct bt_ctf_clock_value *clock_value = NULL; + struct bt_notification *notif = NULL; + int retval; + + trace = lttng_live_stream->trace; + if (!trace) { + goto error; + } + clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0); + if (!clock_class) { + goto error; + } + clock_value = bt_ctf_clock_value_create(clock_class, timestamp); + if (!clock_value) { + goto error; + } + notif = bt_notification_inactivity_create(trace->cc_prio_map); + if (!notif) { + goto error; + } + retval = bt_notification_inactivity_set_clock_value(notif, clock_value); + if (retval) { + goto error; + } + *notification = notif; +end: + bt_put(clock_value); + bt_put(clock_class); + return ret; + +error: + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + bt_put(notif); + goto end; +} + +static +enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *lttng_live_stream, + struct bt_notification **notification) +{ + enum bt_ctf_lttng_live_iterator_status ret = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + struct bt_ctf_clock_class *clock_class = NULL; + struct bt_ctf_clock_value *clock_value = NULL; + + if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + } + + if (lttng_live_stream->current_inactivity_timestamp == + lttng_live_stream->last_returned_inactivity_timestamp) { + lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + goto end; + } + + ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification, + (uint64_t) lttng_live_stream->current_inactivity_timestamp); + + lttng_live_stream->last_returned_inactivity_timestamp = + lttng_live_stream->current_inactivity_timestamp; +end: + bt_put(clock_value); + bt_put(clock_class); + return ret; +} + +static +enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( + struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *lttng_live_stream, + struct bt_notification **notification) +{ + enum bt_ctf_lttng_live_iterator_status ret = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_ctf_notif_iter_status status; + struct lttng_live_session *session; + + bt_list_for_each_entry(session, <tng_live->sessions, node) { + struct lttng_live_trace *trace; + + if (session->new_streams_needed) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + } + bt_list_for_each_entry(trace, &session->traces, node) { + if (trace->new_metadata_needed) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + } + } + } + + if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } + if (lttng_live_stream->packet_end_notif_queue) { + *notification = lttng_live_stream->packet_end_notif_queue; + lttng_live_stream->packet_end_notif_queue = NULL; + status = BT_CTF_NOTIF_ITER_STATUS_OK; + } else { + status = bt_ctf_notif_iter_get_next_notification( + lttng_live_stream->notif_iter, + lttng_live_stream->trace->cc_prio_map, + notification); + if (status == BT_CTF_NOTIF_ITER_STATUS_OK) { + /* + * Consider empty packets as inactivity. + */ + if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) { + lttng_live_stream->packet_end_notif_queue = *notification; + *notification = NULL; + return emit_inactivity_notification(lttng_live, + lttng_live_stream, notification, + lttng_live_stream->current_packet_end_timestamp); + } + } + } + switch (status) { + case BT_CTF_NOTIF_ITER_STATUS_EOF: + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + break; + case BT_CTF_NOTIF_ITER_STATUS_OK: + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + break; + case BT_CTF_NOTIF_ITER_STATUS_AGAIN: + /* + * Continue immediately (end of packet). The next + * get_index may return AGAIN to delay the following + * attempt. + */ + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + break; + case BT_CTF_NOTIF_ITER_STATUS_INVAL: + /* No argument provided by the user, so don't return INVAL. */ + case BT_CTF_NOTIF_ITER_STATUS_ERROR: + default: + ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + break; + } + return ret; +} + +/* + * helper function: + * handle_no_data_streams() + * retry: + * - for each ACTIVE_NO_DATA stream: + * - query relayd for stream data, or quiescence info. + * - if need metadata, get metadata, goto retry. + * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry + * - if quiescent, move to QUIESCENT streams + * - if fetched data, move to ACTIVE_DATA streams + * (at this point each stream either has data, or is quiescent) + * + * + * iterator_next: + * handle_new_streams_and_metadata() + * - query relayd for known streams, add them as ACTIVE_NO_DATA + * - query relayd for metadata + * + * call handle_active_no_data_streams() + * + * handle_quiescent_streams() + * - if at least one stream is ACTIVE_DATA: + * - peek stream event with lowest timestamp -> next_ts + * - for each quiescent stream + * - if next_ts >= quiescent end + * - set state to ACTIVE_NO_DATA + * - else + * - for each quiescent stream + * - set state to ACTIVE_NO_DATA + * + * call handle_active_no_data_streams() + * + * handle_active_data_streams() + * - if at least one stream is ACTIVE_DATA: + * - get stream event with lowest timestamp from heap + * - make that stream event the current notification. + * - move this stream heap position to its next event + * - if we need to fetch data from relayd, move + * stream to ACTIVE_NO_DATA. + * - return OK + * - return AGAIN + * + * end criterion: ctrl-c on client. If relayd exits or the session + * closes on the relay daemon side, we keep on waiting for streams. + * Eventually handle --end timestamp (also an end criterion). + * + * When disconnected from relayd: try to re-connect endlessly. + */ +static +struct bt_notification_iterator_next_return lttng_live_iterator_next_stream( + struct bt_private_notification_iterator *iterator, + struct lttng_live_stream_iterator *stream_iter) +{ + enum bt_ctf_lttng_live_iterator_status status; + struct bt_notification_iterator_next_return next_return; + struct lttng_live_component *lttng_live; + + lttng_live = stream_iter->trace->session->lttng_live; +retry: + print_stream_state(stream_iter); + next_return.notification = NULL; + status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + status = lttng_live_iterator_next_handle_one_no_data_stream( + lttng_live, stream_iter); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + status = lttng_live_iterator_next_handle_one_quiescent_stream( + lttng_live, stream_iter, &next_return.notification); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + assert(next_return.notification == NULL); + goto end; + } + if (next_return.notification) { + goto end; + } + status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live, + stream_iter, &next_return.notification); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + assert(next_return.notification == NULL); + } + +end: + switch (status) { + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + print_dbg("continue"); + goto retry; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + print_dbg("again"); + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + print_dbg("end"); + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + print_dbg("ok"); + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR: + default: /* fall-through */ + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + break; + } + return next_return; +} + +static +struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream( + struct bt_private_notification_iterator *iterator, + struct lttng_live_no_stream_iterator *no_stream_iter) +{ + enum bt_ctf_lttng_live_iterator_status status; + struct bt_notification_iterator_next_return next_return; + struct lttng_live_component *lttng_live; + + lttng_live = no_stream_iter->lttng_live; +retry: + lttng_live_force_new_streams_and_metadata(lttng_live); + status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + if (no_stream_iter->port) { + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + } +end: + switch (status) { + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + goto retry; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; + break; + case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR: + default: /* fall-through */ + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + break; + } + return next_return; +} + BT_HIDDEN struct bt_notification_iterator_next_return lttng_live_iterator_next( struct bt_private_notification_iterator *iterator) { - struct bt_notification_iterator_next_return ret = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR, - }; + struct lttng_live_stream_iterator_generic *s = + bt_private_notification_iterator_get_user_data(iterator); + struct bt_notification_iterator_next_return next_return; + + switch (s->type) { + case LIVE_STREAM_TYPE_NO_STREAM: + next_return = lttng_live_iterator_next_no_stream(iterator, + container_of(s, struct lttng_live_no_stream_iterator, p)); + break; + case LIVE_STREAM_TYPE_STREAM: + next_return = lttng_live_iterator_next_stream(iterator, + container_of(s, struct lttng_live_stream_iterator, p)); + break; + default: + next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + break; + } + return next_return; +} +BT_HIDDEN +enum bt_notification_iterator_status lttng_live_iterator_init( + struct bt_private_notification_iterator *it, + struct bt_private_port *port) +{ + enum bt_notification_iterator_status ret = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + struct lttng_live_stream_iterator_generic *s; + struct lttng_live_component *lttng_live; + + assert(it); + + s = bt_private_port_get_user_data(port); + assert(s); + switch (s->type) { + case LIVE_STREAM_TYPE_NO_STREAM: + { + struct lttng_live_no_stream_iterator *no_stream_iter = + container_of(s, struct lttng_live_no_stream_iterator, p); + lttng_live = no_stream_iter->lttng_live; + ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter); + if (ret) { + goto error; + } + break; + } + case LIVE_STREAM_TYPE_STREAM: + { + struct lttng_live_stream_iterator *stream_iter = + container_of(s, struct lttng_live_stream_iterator, p); + lttng_live = stream_iter->trace->session->lttng_live; + ret = bt_private_notification_iterator_set_user_data(it, stream_iter); + if (ret) { + goto error; + } + break; + } + default: + ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + +end: return ret; +error: + if (bt_private_notification_iterator_set_user_data(it, NULL) + != BT_NOTIFICATION_ITERATOR_STATUS_OK) { + PERR("Error setting private data to NULL\n"); + } + goto end; +} + +static +struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class, + struct bt_value *params) +{ + struct bt_value *url_value = NULL; + struct bt_value *results = NULL; + const char *url; + struct bt_live_viewer_connection *viewer_connection = NULL; + enum bt_value_status ret; + + url_value = bt_value_map_get(params, "url"); + if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) { + fprintf(stderr, "Mandatory \"url\" parameter missing\n"); + goto error; + } + + ret = bt_value_string_get(url_value, &url); + if (ret != BT_VALUE_STATUS_OK) { + fprintf(stderr, "\"url\" parameter is required to be a string value\n"); + goto error; + } + + viewer_connection = bt_live_viewer_connection_create(url, stderr); + if (!viewer_connection) { + ret = BT_COMPONENT_STATUS_NOMEM; + goto error; + } + + results = bt_live_viewer_connection_list_sessions(viewer_connection); + goto end; +error: + BT_PUT(results); +end: + if (viewer_connection) { + bt_live_viewer_connection_destroy(viewer_connection); + } + BT_PUT(url_value); + return results; +} + +BT_HIDDEN +struct bt_value *lttng_live_query(struct bt_component_class *comp_class, + const char *object, struct bt_value *params) +{ + if (strcmp(object, "sessions") == 0) { + return lttng_live_query_list_sessions(comp_class, + params); + } + fprintf(stderr, "Unknown query object `%s`\n", object); + return NULL; +} + +static +void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) +{ + int ret; + struct lttng_live_session *session, *s; + + bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { + lttng_live_destroy_session(session); + } + BT_PUT(lttng_live->viewer_connection); + if (lttng_live->url) { + g_string_free(lttng_live->url, TRUE); + } + if (lttng_live->no_stream_port) { + ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); + assert(!ret); + BT_PUT(lttng_live->no_stream_port); + } + if (lttng_live->no_stream_iter) { + g_free(lttng_live->no_stream_iter); + } + g_free(lttng_live); +} + +BT_HIDDEN +void lttng_live_component_finalize(struct bt_private_component *component) +{ + void *data = bt_private_component_get_user_data(component); + + if (!data) { + return; + } + lttng_live_component_destroy_data(data); +} + +static +struct lttng_live_component *lttng_live_component_create(struct bt_value *params, + struct bt_private_component *private_component) +{ + struct lttng_live_component *lttng_live; + struct bt_value *value = NULL; + const char *url; + enum bt_value_status ret; + + lttng_live = g_new0(struct lttng_live_component, 1); + if (!lttng_live) { + goto end; + } + lttng_live->error_fp = stderr; + /* TODO: make this an overridable parameter. */ + lttng_live->max_query_size = MAX_QUERY_SIZE; + BT_INIT_LIST_HEAD(<tng_live->sessions); + value = bt_value_map_get(params, "url"); + if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) { + fprintf(stderr, "Mandatory \"url\" parameter missing\n"); + goto error; + } + ret = bt_value_string_get(value, &url); + if (ret != BT_VALUE_STATUS_OK) { + fprintf(stderr, "\"url\" parameter is required to be a string value\n"); + goto error; + } + lttng_live->url = g_string_new(url); + if (!lttng_live->url) { + goto error; + } + lttng_live->viewer_connection = + bt_live_viewer_connection_create(lttng_live->url->str, + stderr); + if (!lttng_live->viewer_connection) { + ret = BT_COMPONENT_STATUS_NOMEM; + goto error; + } + if (lttng_live_create_viewer_session(lttng_live)) { + ret = BT_COMPONENT_STATUS_ERROR; + goto error; + } + lttng_live->private_component = private_component; + + goto end; + +error: + lttng_live_component_destroy_data(lttng_live); + lttng_live = NULL; +end: + return lttng_live; } BT_HIDDEN -enum bt_component_status lttng_live_init(struct bt_private_component *component, - struct bt_value *params, UNUSED_VAR void *init_method_data) +enum bt_component_status lttng_live_component_init(struct bt_private_component *component, + struct bt_value *params, void *init_method_data) { - return BT_COMPONENT_STATUS_OK; + struct lttng_live_component *lttng_live; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0; + + /* Passes ownership of iter ref to lttng_live_component_create. */ + lttng_live = lttng_live_component_create(params, component); + if (!lttng_live) { + ret = BT_COMPONENT_STATUS_NOMEM; + goto end; + } + + lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1); + lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM; + lttng_live->no_stream_iter->lttng_live = lttng_live; + + lttng_live->no_stream_port = + bt_private_component_source_add_output_private_port( + lttng_live->private_component, "no-stream", + lttng_live->no_stream_iter); + lttng_live->no_stream_iter->port = lttng_live->no_stream_port; + + ret = bt_private_component_set_user_data(component, lttng_live); + if (ret != BT_COMPONENT_STATUS_OK) { + goto error; + } + +end: + return ret; +error: + (void) bt_private_component_set_user_data(component, NULL); + lttng_live_component_destroy_data(lttng_live); + return ret; } diff --git a/plugins/ctf/lttng-live/lttng-viewer-abi.h b/plugins/ctf/lttng-live/lttng-viewer-abi.h new file mode 100644 index 00000000..71047f8c --- /dev/null +++ b/plugins/ctf/lttng-live/lttng-viewer-abi.h @@ -0,0 +1,254 @@ +#ifndef LTTNG_VIEWER_ABI_H +#define LTTNG_VIEWER_ABI_H + +/* + * Copyright (C) 2013 - Julien Desfossez + * Mathieu Desnoyers + * David Goulet + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include + +#define LTTNG_VIEWER_PATH_MAX 4096 +#define LTTNG_VIEWER_NAME_MAX 255 +#define LTTNG_VIEWER_HOST_NAME_MAX 64 + +/* Flags in reply to get_next_index and get_packet. */ +enum { + /* New metadata is required to read this packet. */ + LTTNG_VIEWER_FLAG_NEW_METADATA = (1 << 0), + /* New stream got added to the trace. */ + LTTNG_VIEWER_FLAG_NEW_STREAM = (1 << 1), +}; + +enum lttng_viewer_command { + LTTNG_VIEWER_CONNECT = 1, + LTTNG_VIEWER_LIST_SESSIONS = 2, + LTTNG_VIEWER_ATTACH_SESSION = 3, + LTTNG_VIEWER_GET_NEXT_INDEX = 4, + LTTNG_VIEWER_GET_PACKET = 5, + LTTNG_VIEWER_GET_METADATA = 6, + LTTNG_VIEWER_GET_NEW_STREAMS = 7, + LTTNG_VIEWER_CREATE_SESSION = 8, + LTTNG_VIEWER_DETACH_SESSION = 9, +}; + +enum lttng_viewer_attach_return_code { + LTTNG_VIEWER_ATTACH_OK = 1, /* The attach command succeeded. */ + LTTNG_VIEWER_ATTACH_ALREADY = 2, /* A viewer is already attached. */ + LTTNG_VIEWER_ATTACH_UNK = 3, /* The session ID is unknown. */ + LTTNG_VIEWER_ATTACH_NOT_LIVE = 4, /* The session is not live. */ + LTTNG_VIEWER_ATTACH_SEEK_ERR = 5, /* Seek error. */ + LTTNG_VIEWER_ATTACH_NO_SESSION = 6, /* No viewer session created. */ +}; + +enum lttng_viewer_detach_session_return_code { + LTTNG_VIEWER_DETACH_SESSION_OK = 1, + LTTNG_VIEWER_DETACH_SESSION_UNK = 2, + LTTNG_VIEWER_DETACH_SESSION_ERR = 3, +}; + +enum lttng_viewer_next_index_return_code { + LTTNG_VIEWER_INDEX_OK = 1, /* Index is available. */ + LTTNG_VIEWER_INDEX_RETRY = 2, /* Index not yet available. */ + LTTNG_VIEWER_INDEX_HUP = 3, /* Index closed (trace destroyed). */ + LTTNG_VIEWER_INDEX_ERR = 4, /* Unknow error. */ + LTTNG_VIEWER_INDEX_INACTIVE = 5, /* Inactive stream beacon. */ + LTTNG_VIEWER_INDEX_EOF = 6, /* End of index file. */ +}; + +enum lttng_viewer_get_packet_return_code { + LTTNG_VIEWER_GET_PACKET_OK = 1, + LTTNG_VIEWER_GET_PACKET_RETRY = 2, + LTTNG_VIEWER_GET_PACKET_ERR = 3, + LTTNG_VIEWER_GET_PACKET_EOF = 4, +}; + +enum lttng_viewer_get_metadata_return_code { + LTTNG_VIEWER_METADATA_OK = 1, + LTTNG_VIEWER_NO_NEW_METADATA = 2, + LTTNG_VIEWER_METADATA_ERR = 3, +}; + +enum lttng_viewer_connection_type { + LTTNG_VIEWER_CLIENT_COMMAND = 1, + LTTNG_VIEWER_CLIENT_NOTIFICATION = 2, +}; + +enum lttng_viewer_seek { + /* Receive the trace packets from the beginning. */ + LTTNG_VIEWER_SEEK_BEGINNING = 1, + /* Receive the trace packets from now. */ + LTTNG_VIEWER_SEEK_LAST = 2, +}; + +enum lttng_viewer_new_streams_return_code { + LTTNG_VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */ + LTTNG_VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */ + LTTNG_VIEWER_NEW_STREAMS_ERR = 3, /* Error. */ + LTTNG_VIEWER_NEW_STREAMS_HUP = 4, /* Session closed. */ +}; + +enum lttng_viewer_create_session_return_code { + LTTNG_VIEWER_CREATE_SESSION_OK = 1, + LTTNG_VIEWER_CREATE_SESSION_ERR = 2, +}; + +struct lttng_viewer_session { + uint64_t id; + uint32_t live_timer; + uint32_t clients; + uint32_t streams; + char hostname[LTTNG_VIEWER_HOST_NAME_MAX]; + char session_name[LTTNG_VIEWER_NAME_MAX]; +} __attribute__((__packed__)); + +struct lttng_viewer_stream { + uint64_t id; + uint64_t ctf_trace_id; + uint32_t metadata_flag; + char path_name[LTTNG_VIEWER_PATH_MAX]; + char channel_name[LTTNG_VIEWER_NAME_MAX]; +} __attribute__((__packed__)); + +struct lttng_viewer_cmd { + uint64_t data_size; /* data size following this header */ + uint32_t cmd; /* enum lttcomm_relayd_command */ + uint32_t cmd_version; /* command version */ +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_CONNECT payload. + */ +struct lttng_viewer_connect { + /* session ID assigned by the relay for command connections */ + uint64_t viewer_session_id; + uint32_t major; + uint32_t minor; + uint32_t type; /* enum lttng_viewer_connection_type */ +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_LIST_SESSIONS payload. + */ +struct lttng_viewer_list_sessions { + uint32_t sessions_count; + char session_list[]; /* struct lttng_viewer_session */ +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_ATTACH_SESSION payload. + */ +struct lttng_viewer_attach_session_request { + uint64_t session_id; + uint64_t offset; /* unused for now */ + uint32_t seek; /* enum lttng_viewer_seek */ +} __attribute__((__packed__)); + +struct lttng_viewer_attach_session_response { + /* enum lttng_viewer_attach_return_code */ + uint32_t status; + uint32_t streams_count; + /* struct lttng_viewer_stream */ + char stream_list[]; +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_GET_NEXT_INDEX payload. + */ +struct lttng_viewer_get_next_index { + uint64_t stream_id; +} __attribute__ ((__packed__)); + +struct lttng_viewer_index { + uint64_t offset; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + uint64_t stream_id; + uint32_t status; /* enum lttng_viewer_next_index_return_code */ + uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ +} __attribute__ ((__packed__)); + +/* + * LTTNG_VIEWER_GET_PACKET payload. + */ +struct lttng_viewer_get_packet { + uint64_t stream_id; + uint64_t offset; + uint32_t len; +} __attribute__((__packed__)); + +struct lttng_viewer_trace_packet { + uint32_t status; /* enum lttng_viewer_get_packet_return_code */ + uint32_t len; + uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ + char data[]; +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_GET_METADATA payload. + */ +struct lttng_viewer_get_metadata { + uint64_t stream_id; +} __attribute__((__packed__)); + +struct lttng_viewer_metadata_packet { + uint64_t len; + uint32_t status; /* enum lttng_viewer_get_metadata_return_code */ + char data[]; +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_GET_NEW_STREAMS payload. + */ +struct lttng_viewer_new_streams_request { + uint64_t session_id; +} __attribute__((__packed__)); + +struct lttng_viewer_new_streams_response { + /* enum lttng_viewer_new_streams_return_code */ + uint32_t status; + uint32_t streams_count; + /* struct lttng_viewer_stream */ + char stream_list[]; +} __attribute__((__packed__)); + +struct lttng_viewer_create_session_response { + /* enum lttng_viewer_create_session_return_code */ + uint32_t status; +} __attribute__((__packed__)); + +/* + * LTTNG_VIEWER_DETACH_SESSION payload. + */ +struct lttng_viewer_detach_session_request { + uint64_t session_id; +} __attribute__((__packed__)); + +struct lttng_viewer_detach_session_response { + /* enum lttng_viewer_detach_session_return_code */ + uint32_t status; +} __attribute__((__packed__)); + +#endif /* LTTNG_VIEWER_ABI_H */ diff --git a/plugins/ctf/lttng-live/metadata.c b/plugins/ctf/lttng-live/metadata.c new file mode 100644 index 00000000..b176e114 --- /dev/null +++ b/plugins/ctf/lttng-live/metadata.c @@ -0,0 +1,276 @@ +/* + * Copyright 2016 - Philippe Proulx + * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation + * + * Some functions are based on older functions written by Mathieu Desnoyers. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include + +#define PRINT_ERR_STREAM lttng_live->error_fp +#define PRINT_PREFIX "lttng-live-metadata" +#define PRINT_DBG_CHECK lttng_live_debug +#include "../print.h" + +#include "metadata.h" +#include "../common/metadata/decoder.h" + +#define TSDL_MAGIC 0x75d11d57 + +struct packet_header { + uint32_t magic; + uint8_t uuid[16]; + uint32_t checksum; + uint32_t content_size; + uint32_t packet_size; + uint8_t compression_scheme; + uint8_t encryption_scheme; + uint8_t checksum_scheme; + uint8_t major; + uint8_t minor; +} __attribute__((__packed__)); + +static +enum bt_ctf_lttng_live_iterator_status lttng_live_update_clock_map( + struct lttng_live_trace *trace) +{ + enum bt_ctf_lttng_live_iterator_status status = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + size_t i; + int count, ret; + + BT_PUT(trace->cc_prio_map); + trace->cc_prio_map = bt_clock_class_priority_map_create(); + if (!trace->cc_prio_map) { + goto error; + } + + count = bt_ctf_trace_get_clock_class_count(trace->trace); + assert(count >= 0); + + for (i = 0; i < count; i++) { + struct bt_ctf_clock_class *clock_class = + bt_ctf_trace_get_clock_class_by_index(trace->trace, i); + + assert(clock_class); + ret = bt_clock_class_priority_map_add_clock_class( + trace->cc_prio_map, clock_class, 0); + BT_PUT(clock_class); + + if (ret) { + goto error; + } + } + + goto end; +error: + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; +end: + return status; +} + +BT_HIDDEN +enum bt_ctf_lttng_live_iterator_status lttng_live_metadata_update( + struct lttng_live_trace *trace) +{ + struct lttng_live_session *session = trace->session; + struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_metadata *metadata = trace->metadata; + ssize_t ret = 0; + size_t size, len_read = 0; + char *metadata_buf = NULL; + FILE *fp = NULL; + enum ctf_metadata_decoder_status decoder_status; + enum bt_ctf_lttng_live_iterator_status status = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + + /* No metadata stream yet. */ + if (!metadata) { + if (session->new_streams_needed) { + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + session->new_streams_needed = true; + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + } + goto end; + } + + if (!trace->new_metadata_needed) { + goto end; + } + + /* Open for writing */ + fp = bt_open_memstream(&metadata_buf, &size); + if (!fp) { + PERR("Metadata open_memstream: %s\n", strerror(errno)); + goto error; + } + + /* Grab all available metadata. */ + do { + /* + * get_one_metadata_packet returns the number of bytes + * received, 0 when we have received everything, a + * negative value on error. + */ + ret = lttng_live_get_one_metadata_packet(trace, fp); + if (ret > 0) { + len_read += ret; + } + } while (ret > 0); + + /* + * Consider metadata closed as soon as we get an error reading + * it (e.g. cannot be found). + */ + if (ret < 0) { + if (!metadata->closed) { + metadata->closed = true; + /* + * Release our reference on the trace as soon as + * we know the metadata stream is not available + * anymore. This won't necessarily teardown the + * metadata objects immediately, but only when + * the data streams are done. + */ + lttng_live_unref_trace(metadata->trace); + } + } + + if (bt_close_memstream(&metadata_buf, &size, fp)) { + BT_LOGE("bt_close_memstream: %s", strerror(errno)); + } + ret = 0; + fp = NULL; + + if (len_read == 0) { + if (!trace->trace) { + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + goto end; + } + trace->new_metadata_needed = false; + goto end; + } + + if (babeltrace_debug) { + // yydebug = 1; + } + + fp = bt_fmemopen(metadata_buf, len_read, "rb"); + if (!fp) { + PERR("Cannot memory-open metadata buffer: %s\n", + strerror(errno)); + goto error; + } + + decoder_status = ctf_metadata_decoder_decode(metadata->decoder, fp); + switch (decoder_status) { + case CTF_METADATA_DECODER_STATUS_OK: + BT_PUT(trace->trace); + trace->trace = ctf_metadata_decoder_get_trace(metadata->decoder); + trace->new_metadata_needed = false; + status = lttng_live_update_clock_map(trace); + if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + goto end; + } + break; + case CTF_METADATA_DECODER_STATUS_INCOMPLETE: + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + break; + case CTF_METADATA_DECODER_STATUS_ERROR: + case CTF_METADATA_DECODER_STATUS_INVAL_VERSION: + case CTF_METADATA_DECODER_STATUS_IR_VISITOR_ERROR: + goto error; + } + + goto end; +error: + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; +end: + if (fp) { + int closeret; + + closeret = fclose(fp); + if (closeret) { + PERR("Error on fclose"); + } + } + return status; +} + +BT_HIDDEN +int lttng_live_metadata_create_stream(struct lttng_live_session *session, + uint64_t ctf_trace_id, + uint64_t stream_id) +{ + struct lttng_live_metadata *metadata = NULL; + struct lttng_live_trace *trace; + + metadata = g_new0(struct lttng_live_metadata, 1); + if (!metadata) { + return -1; + } + metadata->stream_id = stream_id; + //TODO: add clock offset option + metadata->decoder = ctf_metadata_decoder_create( + session->lttng_live->error_fp, 0); + if (!metadata->decoder) { + goto error; + } + trace = lttng_live_ref_trace(session, ctf_trace_id); + if (!trace) { + goto error; + } + metadata->trace = trace; + trace->metadata = metadata; + return 0; + +error: + ctf_metadata_decoder_destroy(metadata->decoder); + g_free(metadata); + return -1; +} + +BT_HIDDEN +void lttng_live_metadata_fini(struct lttng_live_trace *trace) +{ + struct lttng_live_metadata *metadata = trace->metadata; + + if (!metadata) { + return; + } + if (metadata->text) { + free(metadata->text); + } + ctf_metadata_decoder_destroy(metadata->decoder); + trace->metadata = NULL; + lttng_live_unref_trace(trace); + if (!metadata->closed) { + lttng_live_unref_trace(metadata->trace); + } + g_free(metadata); +} diff --git a/plugins/ctf/lttng-live/metadata.h b/plugins/ctf/lttng-live/metadata.h new file mode 100644 index 00000000..3d9f7b86 --- /dev/null +++ b/plugins/ctf/lttng-live/metadata.h @@ -0,0 +1,41 @@ +#ifndef LTTNG_LIVE_METADATA_H +#define LTTNG_LIVE_METADATA_H + +/* + * Copyright 2016 - Philippe Proulx + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include "lttng-live-internal.h" + +int lttng_live_metadata_create_stream(struct lttng_live_session *session, + uint64_t ctf_trace_id, + uint64_t stream_id); + +enum bt_ctf_lttng_live_iterator_status lttng_live_metadata_update( + struct lttng_live_trace *trace); + +void lttng_live_metadata_fini(struct lttng_live_trace *trace); + +#endif /* LTTNG_LIVE_METADATA_H */ diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c new file mode 100644 index 00000000..64cf81e7 --- /dev/null +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -0,0 +1,1481 @@ +/* + * Copyright 2016 - Mathieu Desnoyers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "lttng-live-internal.h" +#include "viewer-connection.h" +#include "lttng-viewer-abi.h" +#include "data-stream.h" +#include "metadata.h" + +#define PRINT_ERR_STREAM viewer_connection->error_fp +#define PRINT_PREFIX "lttng-live-viewer-connection" +#define PRINT_DBG_CHECK lttng_live_debug +#include "../print.h" + +static ssize_t lttng_live_recv(int fd, void *buf, size_t len) +{ + ssize_t ret; + size_t copied = 0, to_copy = len; + + do { + ret = recv(fd, buf + copied, to_copy, 0); + if (ret > 0) { + assert(ret <= to_copy); + copied += ret; + to_copy -= ret; + } + } while ((ret > 0 && to_copy > 0) + || (ret < 0 && errno == EINTR)); + if (ret > 0) + ret = copied; + /* ret = 0 means orderly shutdown, ret < 0 is error. */ + return ret; +} + +static ssize_t lttng_live_send(int fd, const void *buf, size_t len) +{ + ssize_t ret; + + do { + ret = bt_send_nosigpipe(fd, buf, len); + } while (ret < 0 && errno == EINTR); + return ret; +} + +/* + * hostname parameter needs to hold MAXNAMLEN chars. + */ +static int parse_url(struct bt_live_viewer_connection *viewer_connection) +{ + char remain[3][MAXNAMLEN]; + int ret = -1, proto, proto_offset = 0; + const char *path = viewer_connection->url->str; + size_t path_len; + + if (!path) { + goto end; + } + path_len = strlen(path); /* not accounting \0 */ + + /* + * Since sscanf API does not allow easily checking string length + * against a size defined by a macro. Test it beforehand on the + * input. We know the output is always <= than the input length. + */ + if (path_len >= MAXNAMLEN) { + goto end; + } + ret = sscanf(path, "net%d://", &proto); + if (ret < 1) { + proto = 4; + /* net:// */ + proto_offset = strlen("net://"); + } else { + /* net4:// or net6:// */ + proto_offset = strlen("netX://"); + } + if (proto_offset > path_len) { + goto end; + } + if (proto == 6) { + PERR("[error] IPv6 is currently unsupported by lttng-live\n"); + goto end; + } + /* TODO : parse for IPv6 as well */ + /* Parse the hostname or IP */ + ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s", + viewer_connection->relay_hostname, remain[0]); + if (ret == 2) { + /* Optional port number */ + switch (remain[0][0]) { + case ':': + ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]); + /* Optional session ID with port number */ + if (ret == 2) { + ret = sscanf(remain[1], "/%s", remain[2]); + /* Accept 0 or 1 (optional) */ + if (ret < 0) { + goto end; + } + } else if (ret == 0) { + PERR("[error] Missing port number after delimitor ':'\n"); + ret = -1; + goto end; + } + break; + case '/': + /* Optional session ID */ + ret = sscanf(remain[0], "/%s", remain[2]); + /* Accept 0 or 1 (optional) */ + if (ret < 0) { + goto end; + } + break; + default: + PERR("[error] wrong delimitor : %c\n", remain[0][0]); + ret = -1; + goto end; + } + } + + if (viewer_connection->port < 0) { + viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT; + } + + if (strlen(remain[2]) == 0) { + PDBG("Connecting to hostname : %s, port : %d, " + "proto : IPv%d\n", + viewer_connection->relay_hostname, + viewer_connection->port, + proto); + ret = 0; + goto end; + } + ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s", + viewer_connection->target_hostname, + viewer_connection->session_name); + if (ret != 2) { + PERR("[error] Format : " + "net:///host//\n"); + goto end; + } + + PDBG("Connecting to hostname : %s, port : %d, " + "target hostname : %s, session name : %s, " + "proto : IPv%d\n", + viewer_connection->relay_hostname, + viewer_connection->port, + viewer_connection->target_hostname, + viewer_connection->session_name, proto); + ret = 0; + +end: + return ret; +} + +static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_connect connect; + int ret; + ssize_t ret_len; + + cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); + cmd.data_size = htobe64((uint64_t) sizeof(connect)); + cmd.cmd_version = htobe32(0); + + connect.viewer_session_id = -1ULL; /* will be set on recv */ + connect.major = htobe32(LTTNG_LIVE_MAJOR); + connect.minor = htobe32(LTTNG_LIVE_MINOR); + connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect)); + if (ret_len < 0) { + PERR("Error sending version: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(connect)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("[error] Error receiving version: %s", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(connect)); + + PDBG("Received viewer session ID : %" PRIu64 "\n", + be64toh(connect.viewer_session_id)); + PDBG("Relayd version : %u.%u\n", be32toh(connect.major), + be32toh(connect.minor)); + + if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) { + PERR("Incompatible lttng-relayd protocol\n"); + goto error; + } + /* Use the smallest protocol version implemented. */ + if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) { + viewer_connection->minor = be32toh(connect.minor); + } else { + viewer_connection->minor = LTTNG_LIVE_MINOR; + } + viewer_connection->major = LTTNG_LIVE_MAJOR; + ret = 0; + return ret; + +error: + PERR("Unable to establish connection\n"); + return -1; +} + +static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection) +{ + struct hostent *host; + struct sockaddr_in server_addr; + int ret; + + if (parse_url(viewer_connection)) { + goto error; + } + + host = gethostbyname(viewer_connection->relay_hostname); + if (!host) { + PERR("[error] Cannot lookup hostname %s\n", + viewer_connection->relay_hostname); + goto error; + } + + if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + PERR("[error] Socket creation failed: %s\n", strerror(errno)); + goto error; + } + + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(viewer_connection->port); + server_addr.sin_addr = *((struct in_addr *) host->h_addr); + memset(&(server_addr.sin_zero), 0, 8); + + if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, + sizeof(struct sockaddr)) == -1) { + PERR("[error] Connection failed: %s\n", strerror(errno)); + goto error; + } + if (lttng_live_handshake(viewer_connection)) { + goto error; + } + + ret = 0; + + return ret; + +error: + if (viewer_connection->control_sock >= 0) { + if (close(viewer_connection->control_sock)) { + PERR("Close: %s", strerror(errno)); + } + } + viewer_connection->control_sock = -1; + return -1; +} + +static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) +{ + if (viewer_connection->control_sock < 0) { + return; + } + if (close(viewer_connection->control_sock)) { + PERR("Close: %s", strerror(errno)); + viewer_connection->control_sock = -1; + } +} + +static void connection_release(struct bt_object *obj) +{ + struct bt_live_viewer_connection *conn = + container_of(obj, struct bt_live_viewer_connection, obj); + + bt_live_viewer_connection_destroy(conn); +} + +static +enum bt_value_status list_update_session(struct bt_value *results, + const struct lttng_viewer_session *session, + bool *_found) +{ + enum bt_value_status ret = BT_VALUE_STATUS_OK; + struct bt_value *map = NULL; + struct bt_value *hostname = NULL; + struct bt_value *session_name = NULL; + struct bt_value *btval = NULL; + int i, len; + bool found = false; + + len = bt_value_array_size(results); + if (len < 0) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + for (i = 0; i < len; i++) { + const char *hostname_str = NULL; + const char *session_name_str = NULL; + + map = bt_value_array_get(results, (size_t) i); + if (!map) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + hostname = bt_value_map_get(map, "target-hostname"); + if (!hostname) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + session_name = bt_value_map_get(map, "session-name"); + if (!session_name) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + ret = bt_value_string_get(hostname, &hostname_str); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + ret = bt_value_string_get(session_name, &session_name_str); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + + if (!strcmp(session->hostname, hostname_str) + && !strcmp(session->session_name, + session_name_str)) { + int64_t val; + uint32_t streams = be32toh(session->streams); + uint32_t clients = be32toh(session->clients); + + found = true; + + btval = bt_value_map_get(map, "stream-count"); + if (!btval) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + ret = bt_value_integer_get(btval, &val); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + /* sum */ + val += streams; + ret = bt_value_integer_set(btval, val); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + BT_PUT(btval); + + btval = bt_value_map_get(map, "client-count"); + if (!btval) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + ret = bt_value_integer_get(btval, &val); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + /* max */ + val = max_t(int64_t, clients, val); + ret = bt_value_integer_set(btval, val); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + BT_PUT(btval); + } + + BT_PUT(hostname); + BT_PUT(session_name); + BT_PUT(map); + + if (found) { + break; + } + } +end: + BT_PUT(btval); + BT_PUT(hostname); + BT_PUT(session_name); + BT_PUT(map); + *_found = found; + return ret; +} + +static +enum bt_value_status list_append_session(struct bt_value *results, + GString *base_url, + const struct lttng_viewer_session *session) +{ + enum bt_value_status ret = BT_VALUE_STATUS_OK; + struct bt_value *map = NULL; + GString *url = NULL; + bool found = false; + + /* + * If the session already exists, add the stream count to it, + * and do max of client counts. + */ + ret = list_update_session(results, session, &found); + if (ret != BT_VALUE_STATUS_OK || found) { + goto end; + } + + map = bt_value_map_create(); + if (!map) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + + if (base_url->len < 1) { + ret = BT_VALUE_STATUS_ERROR; + goto end; + } + /* + * key = "url", + * value = , + */ + url = g_string_new(base_url->str); + g_string_append(url, "/host/"); + g_string_append(url, session->hostname); + g_string_append_c(url, '/'); + g_string_append(url, session->session_name); + + ret = bt_value_map_insert_string(map, "url", url->str); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + + /* + * key = "target-hostname", + * value = , + */ + ret = bt_value_map_insert_string(map, "target-hostname", + session->hostname); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + + /* + * key = "session-name", + * value = , + */ + ret = bt_value_map_insert_string(map, "session-name", + session->session_name); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + + /* + * key = "timer-us", + * value = , + */ + { + uint32_t live_timer = be32toh(session->live_timer); + + ret = bt_value_map_insert_integer(map, "timer-us", + live_timer); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + } + + /* + * key = "stream-count", + * value = , + */ + { + uint32_t streams = be32toh(session->streams); + + ret = bt_value_map_insert_integer(map, "stream-count", + streams); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + } + + + /* + * key = "client-count", + * value = , + */ + { + uint32_t clients = be32toh(session->clients); + + ret = bt_value_map_insert_integer(map, "client-count", + clients); + if (ret != BT_VALUE_STATUS_OK) { + goto end; + } + } + + ret = bt_value_array_append(results, map); +end: + if (url) { + g_string_free(url, TRUE); + } + BT_PUT(map); + return ret; +} + +/* + * Data structure returned: + * + * { + * = { + * [n] = { + * = { + * { + * key = "url", + * value = , + * }, + * { + * key = "target-hostname", + * value = , + * }, + * { + * key = "session-name", + * value = , + * }, + * { + * key = "timer-us", + * value = , + * }, + * { + * key = "stream-count", + * value = , + * }, + * { + * key = "client-count", + * value = , + * }, + * }, + * } + * } + */ + +BT_HIDDEN +struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection) +{ + struct bt_value *results = NULL; + struct lttng_viewer_cmd cmd; + struct lttng_viewer_list_sessions list; + uint32_t i, sessions_count; + ssize_t ret_len; + + if (lttng_live_handshake(viewer_connection)) { + goto error; + } + + results = bt_value_array_create(); + if (!results) { + fprintf(stderr, "Error creating array\n"); + goto error; + } + + cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + fprintf(stderr, "Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + if (ret_len == 0) { + fprintf(stderr, "Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + fprintf(stderr, "Error receiving session list: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(list)); + + sessions_count = be32toh(list.sessions_count); + for (i = 0; i < sessions_count; i++) { + struct lttng_viewer_session lsession; + + ret_len = lttng_live_recv(viewer_connection->control_sock, + &lsession, sizeof(lsession)); + if (ret_len == 0) { + fprintf(stderr, "Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + fprintf(stderr, "Error receiving session: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(lsession)); + lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; + lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + if (list_append_session(results, + viewer_connection->url, &lsession) + != BT_VALUE_STATUS_OK) { + goto error; + } + } + goto end; +error: + BT_PUT(results); +end: + return results; +} + +static +int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_list_sessions list; + struct lttng_viewer_session lsession; + uint32_t i, sessions_count; + ssize_t ret_len; + uint64_t session_id; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + + cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving session list: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(list)); + + sessions_count = be32toh(list.sessions_count); + for (i = 0; i < sessions_count; i++) { + ret_len = lttng_live_recv(viewer_connection->control_sock, + &lsession, sizeof(lsession)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving session: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(lsession)); + lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; + lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + session_id = be64toh(lsession.id); + + if ((strncmp(lsession.session_name, + viewer_connection->session_name, + MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + viewer_connection->target_hostname, + MAXNAMLEN) == 0)) { + if (lttng_live_add_session(lttng_live, session_id)) { + goto error; + } + } + } + + return 0; + +error: + PERR("Unable to query session ids\n"); + return -1; +} + +BT_HIDDEN +int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_create_session_response resp; + ssize_t ret_len; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + + cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); + cmd.data_size = htobe64((uint64_t) 0); + cmd.cmd_version = htobe32(0); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving create session reply: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(resp)); + + if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { + PERR("Error creating viewer session\n"); + goto error; + } + if (lttng_live_query_session_ids(lttng_live)) { + goto error; + } + + return 0; + +error: + return -1; +} + +static +int receive_streams(struct lttng_live_session *session, + uint32_t stream_count) +{ + ssize_t ret_len; + uint32_t i; + struct lttng_live_component *lttng_live = session->lttng_live; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + + PDBG("Getting %" PRIu32 " new streams:\n", stream_count); + for (i = 0; i < stream_count; i++) { + struct lttng_viewer_stream stream; + struct lttng_live_stream_iterator *live_stream; + uint64_t stream_id; + uint64_t ctf_trace_id; + + ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving stream\n"); + goto error; + } + assert(ret_len == sizeof(stream)); + stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; + stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; + stream_id = be64toh(stream.id); + ctf_trace_id = be64toh(stream.ctf_trace_id); + + if (stream.metadata_flag) { + PDBG(" metadata stream %" PRIu64 " : %s/%s\n", + stream_id, stream.path_name, + stream.channel_name); + if (lttng_live_metadata_create_stream(session, + ctf_trace_id, stream_id)) { + PERR("Error creating metadata stream\n"); + + goto error; + } + session->lazy_stream_notif_init = true; + } else { + PDBG(" stream %" PRIu64 " : %s/%s\n", + stream_id, stream.path_name, + stream.channel_name); + live_stream = lttng_live_stream_iterator_create(session, + ctf_trace_id, stream_id); + if (!live_stream) { + PERR("Error creating stream\n"); + goto error; + } + } + } + return 0; + +error: + return -1; +} + +BT_HIDDEN +int lttng_live_attach_session(struct lttng_live_session *session) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_attach_session_request rq; + struct lttng_viewer_attach_session_response rp; + ssize_t ret_len; + struct lttng_live_component *lttng_live = session->lttng_live; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + uint64_t session_id = session->id; + uint32_t streams_count; + + if (session->attached) { + return 0; + } + + cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + memset(&rq, 0, sizeof(rq)); + rq.session_id = htobe64(session_id); + // TODO: add cmd line parameter to select seek beginning + // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); + rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending attach request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving attach response: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rp)); + + streams_count = be32toh(rp.streams_count); + switch(be32toh(rp.status)) { + case LTTNG_VIEWER_ATTACH_OK: + break; + case LTTNG_VIEWER_ATTACH_UNK: + PERR("Session id %" PRIu64 " is unknown\n", session_id); + goto error; + case LTTNG_VIEWER_ATTACH_ALREADY: + PERR("There is already a viewer attached to this session\n"); + goto error; + case LTTNG_VIEWER_ATTACH_NOT_LIVE: + PERR("Not a live session\n"); + goto error; + case LTTNG_VIEWER_ATTACH_SEEK_ERR: + PERR("Wrong seek parameter\n"); + goto error; + default: + PERR("Unknown attach return code %u\n", be32toh(rp.status)); + goto error; + } + + /* We receive the initial list of streams. */ + if (receive_streams(session, streams_count)) { + goto error; + } + + session->attached = true; + session->new_streams_needed = false; + + return 0; + +error: + return -1; +} + +BT_HIDDEN +int lttng_live_detach_session(struct lttng_live_session *session) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_detach_session_request rq; + struct lttng_viewer_detach_session_response rp; + ssize_t ret_len; + struct lttng_live_component *lttng_live = session->lttng_live; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + uint64_t session_id = session->id; + + if (!session->attached) { + return 0; + } + + cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + memset(&rq, 0, sizeof(rq)); + rq.session_id = htobe64(session_id); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending detach request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving detach response: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rp)); + + switch(be32toh(rp.status)) { + case LTTNG_VIEWER_DETACH_SESSION_OK: + break; + case LTTNG_VIEWER_DETACH_SESSION_UNK: + PERR("Session id %" PRIu64 " is unknown\n", session_id); + goto error; + case LTTNG_VIEWER_DETACH_SESSION_ERR: + PERR("Error detaching session id %" PRIu64 "\n", session_id); + goto error; + default: + PERR("Unknown detach return code %u\n", be32toh(rp.status)); + goto error; + } + + session->attached = false; + + return 0; + +error: + return -1; +} + +BT_HIDDEN +ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, + FILE *fp) +{ + uint64_t len = 0; + int ret; + struct lttng_viewer_cmd cmd; + struct lttng_viewer_get_metadata rq; + struct lttng_viewer_metadata_packet rp; + char *data = NULL; + ssize_t ret_len; + struct lttng_live_session *session = trace->session; + struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_metadata *metadata = trace->metadata; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + + rq.stream_id = htobe64(metadata->stream_id); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending get_metadata request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving get_metadata response: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rp)); + + switch (be32toh(rp.status)) { + case LTTNG_VIEWER_METADATA_OK: + PDBG("get_metadata : OK\n"); + break; + case LTTNG_VIEWER_NO_NEW_METADATA: + PDBG("get_metadata : NO NEW\n"); + ret = 0; + goto end; + case LTTNG_VIEWER_METADATA_ERR: + PDBG("get_metadata : ERR\n"); + goto error; + default: + PDBG("get_metadata : UNKNOWN\n"); + goto error; + } + + len = be64toh(rp.len); + PDBG("Writing %" PRIu64" bytes to metadata\n", len); + if (len <= 0) { + goto error; + } + + data = zmalloc(len); + if (!data) { + PERR("relay data zmalloc: %s", strerror(errno)); + goto error; + } + ret_len = lttng_live_recv(viewer_connection->control_sock, data, len); + if (ret_len == 0) { + PERR("[error] Remote side has closed connection\n"); + goto error_free_data; + } + if (ret_len < 0) { + PERR("[error] Error receiving trace packet: %s", strerror(errno)); + goto error_free_data; + } + assert(ret_len == len); + + do { + ret_len = fwrite(data, 1, len, fp); + } while (ret_len < 0 && errno == EINTR); + if (ret_len < 0) { + PERR("[error] Writing in the metadata fp\n"); + goto error_free_data; + } + assert(ret_len == len); + free(data); + ret = len; +end: + return ret; + +error_free_data: + free(data); +error: + return -1; +} + +/* + * Assign the fields from a lttng_viewer_index to a packet_index. + */ +static +void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, + struct packet_index *pindex) +{ + assert(lindex); + assert(pindex); + + pindex->offset = be64toh(lindex->offset); + pindex->packet_size = be64toh(lindex->packet_size); + pindex->content_size = be64toh(lindex->content_size); + pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin); + pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end); + pindex->events_discarded = be64toh(lindex->events_discarded); +} + +BT_HIDDEN +enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream, + struct packet_index *index) +{ + struct lttng_viewer_cmd cmd; + struct lttng_viewer_get_next_index rq; + ssize_t ret_len; + struct lttng_viewer_index rp; + uint32_t flags, status; + enum bt_ctf_lttng_live_iterator_status retstatus = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + struct lttng_live_trace *trace = stream->trace; + + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + memset(&rq, 0, sizeof(rq)); + rq.stream_id = htobe64(stream->viewer_stream_id); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending get_next_index request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving get_next_index response: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rp)); + + flags = be32toh(rp.flags); + status = be32toh(rp.status); + + switch (status) { + case LTTNG_VIEWER_INDEX_INACTIVE: + { + uint64_t ctf_stream_class_id; + + PDBG("get_next_index: inactive\n"); + memset(index, 0, sizeof(struct packet_index)); + index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); + stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end; + ctf_stream_class_id = be64toh(rp.stream_id); + if (stream->ctf_stream_class_id != -1ULL) { + assert(stream->ctf_stream_class_id == + ctf_stream_class_id); + } else { + stream->ctf_stream_class_id = ctf_stream_class_id; + } + stream->state = LTTNG_LIVE_STREAM_QUIESCENT; + break; + } + case LTTNG_VIEWER_INDEX_OK: + { + uint64_t ctf_stream_class_id; + + PDBG("get_next_index: OK\n"); + lttng_index_to_packet_index(&rp, index); + ctf_stream_class_id = be64toh(rp.stream_id); + if (stream->ctf_stream_class_id != -1ULL) { + assert(stream->ctf_stream_class_id == + ctf_stream_class_id); + } else { + stream->ctf_stream_class_id = ctf_stream_class_id; + } + + stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA; + stream->current_packet_end_timestamp = + index->ts_cycles.timestamp_end; + + if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { + PDBG("get_next_index: new metadata needed\n"); + trace->new_metadata_needed = true; + } + if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + PDBG("get_next_index: new streams needed\n"); + lttng_live_need_new_streams(lttng_live); + } + break; + } + case LTTNG_VIEWER_INDEX_RETRY: + PDBG("get_next_index: retry\n"); + memset(index, 0, sizeof(struct packet_index)); + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + goto end; + case LTTNG_VIEWER_INDEX_HUP: + PDBG("get_next_index: stream hung up\n"); + memset(index, 0, sizeof(struct packet_index)); + index->offset = EOF; + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + stream->state = LTTNG_LIVE_STREAM_EOF; + break; + case LTTNG_VIEWER_INDEX_ERR: + PERR("get_next_index: error\n"); + memset(index, 0, sizeof(struct packet_index)); + stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + goto error; + default: + PERR("get_next_index: unkwown value\n"); + memset(index, 0, sizeof(struct packet_index)); + stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + goto error; + } +end: + return retstatus; + +error: + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return retstatus; +} + +BT_HIDDEN +enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, + struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, + uint64_t req_len, uint64_t *recv_len) +{ + enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + struct lttng_viewer_cmd cmd; + struct lttng_viewer_get_packet rq; + struct lttng_viewer_trace_packet rp; + ssize_t ret_len; + uint32_t flags, status; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + struct lttng_live_trace *trace = stream->trace; + + PDBG("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64 "\n", + offset, req_len); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + memset(&rq, 0, sizeof(rq)); + rq.stream_id = htobe64(stream->viewer_stream_id); + rq.offset = htobe64(offset); + rq.len = htobe32(req_len); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending get_data request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving get_data response: %s\n", strerror(errno)); + goto error; + } + if (ret_len != sizeof(rp)) { + PERR("[error] get_data_packet: expected %zu" + ", received %zd\n", sizeof(rp), + ret_len); + goto error; + } + + flags = be32toh(rp.flags); + status = be32toh(rp.status); + + switch (status) { + case LTTNG_VIEWER_GET_PACKET_OK: + req_len = be32toh(rp.len); + PDBG("get_data_packet: Ok, packet size : %" PRIu64 "\n", req_len); + break; + case LTTNG_VIEWER_GET_PACKET_RETRY: + /* Unimplemented by relay daemon */ + PDBG("get_data_packet: retry\n"); + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + goto end; + case LTTNG_VIEWER_GET_PACKET_ERR: + if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { + PDBG("get_data_packet: new metadata needed, try again later\n"); + trace->new_metadata_needed = true; + } + if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { + PDBG("get_data_packet: new streams needed, try again later\n"); + lttng_live_need_new_streams(lttng_live); + } + if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA + | LTTNG_VIEWER_FLAG_NEW_STREAM)) { + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + goto end; + } + PERR("get_data_packet: error\n"); + goto error; + case LTTNG_VIEWER_GET_PACKET_EOF: + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF; + goto end; + default: + PDBG("get_data_packet: unknown\n"); + goto error; + } + + if (req_len == 0) { + goto error; + } + + ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving trace packet: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == req_len); + *recv_len = ret_len; +end: + return retstatus; + +error: + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + return retstatus; +} + +/* + * Request new streams for a session. + */ +BT_HIDDEN +enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( + struct lttng_live_session *session) +{ + enum bt_ctf_lttng_live_iterator_status status = + BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + struct lttng_viewer_cmd cmd; + struct lttng_viewer_new_streams_request rq; + struct lttng_viewer_new_streams_response rp; + ssize_t ret_len; + struct lttng_live_component *lttng_live = session->lttng_live; + struct bt_live_viewer_connection *viewer_connection = + lttng_live->viewer_connection; + uint32_t streams_count; + + if (!session->new_streams_needed) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + } + + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); + cmd.data_size = htobe64((uint64_t) sizeof(rq)); + cmd.cmd_version = htobe32(0); + + memset(&rq, 0, sizeof(rq)); + rq.session_id = htobe64(session->id); + + ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + if (ret_len < 0) { + PERR("Error sending cmd: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(cmd)); + + ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + if (ret_len < 0) { + PERR("Error sending get_new_streams request: %s\n", strerror(errno)); + goto error; + } + assert(ret_len == sizeof(rq)); + + ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + if (ret_len == 0) { + PERR("Remote side has closed connection\n"); + goto error; + } + if (ret_len < 0) { + PERR("Error receiving get_new_streams response\n"); + goto error; + } + assert(ret_len == sizeof(rp)); + + streams_count = be32toh(rp.streams_count); + + switch(be32toh(rp.status)) { + case LTTNG_VIEWER_NEW_STREAMS_OK: + session->new_streams_needed = false; + break; + case LTTNG_VIEWER_NEW_STREAMS_NO_NEW: + session->new_streams_needed = false; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_HUP: + session->new_streams_needed = false; + session->closed = true; + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + case LTTNG_VIEWER_NEW_STREAMS_ERR: + PERR("get_new_streams error\n"); + goto error; + default: + PERR("Unknown return code %u\n", be32toh(rp.status)); + goto error; + } + + if (receive_streams(session, streams_count)) { + goto error; + } +end: + return status; + +error: + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return status; +} + +BT_HIDDEN +struct bt_live_viewer_connection * + bt_live_viewer_connection_create(const char *url, FILE *error_fp) +{ + struct bt_live_viewer_connection *viewer_connection; + + viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + + bt_object_init(&viewer_connection->obj, connection_release); + viewer_connection->control_sock = -1; + viewer_connection->port = -1; + viewer_connection->error_fp = error_fp; + viewer_connection->url = g_string_new(url); + if (!viewer_connection->url) { + goto error; + } + + PDBG("Establishing connection to url \"%s\"...\n", url); + if (lttng_live_connect_viewer(viewer_connection)) { + goto error_report; + } + PDBG("Connection to url \"%s\" is established\n", url); + return viewer_connection; + +error_report: + printf_verbose("Failure to establish connection to url \"%s\"\n", url); +error: + g_free(viewer_connection); + return NULL; +} + +BT_HIDDEN +void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection) +{ + PDBG("Closing connection to url \"%s\"\n", viewer_connection->url->str); + lttng_live_disconnect_viewer(viewer_connection); + g_string_free(viewer_connection->url, TRUE); + g_free(viewer_connection); +} diff --git a/plugins/ctf/lttng-live/viewer-connection.h b/plugins/ctf/lttng-live/viewer-connection.h new file mode 100644 index 00000000..e6f551f4 --- /dev/null +++ b/plugins/ctf/lttng-live/viewer-connection.h @@ -0,0 +1,83 @@ +#ifndef LTTNG_LIVE_VIEWER_CONNECTION_H +#define LTTNG_LIVE_VIEWER_CONNECTION_H + +/* + * Copyright 2016 - Mathieu Desnoyers + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include + +#include + +//TODO: this should not be used by plugins. Should copy code into plugin +//instead. +#include "babeltrace/object-internal.h" + +#define LTTNG_DEFAULT_NETWORK_VIEWER_PORT 5344 + +#define LTTNG_LIVE_MAJOR 2 +#define LTTNG_LIVE_MINOR 4 + +struct bt_live_viewer_connection { + struct bt_object obj; + + FILE *error_fp; + + GString *url; + + char relay_hostname[MAXNAMLEN]; + char target_hostname[MAXNAMLEN]; + char session_name[MAXNAMLEN]; + int control_sock; + int port; + + int32_t major; + int32_t minor; +}; + +struct packet_index_time { + int64_t timestamp_begin; + int64_t timestamp_end; +}; + +struct packet_index { + off_t offset; /* offset of the packet in the file, in bytes */ + int64_t data_offset; /* offset of data within the packet, in bits */ + uint64_t packet_size; /* packet size, in bits */ + uint64_t content_size; /* content size, in bits */ + uint64_t events_discarded; + uint64_t events_discarded_len; /* length of the field, in bits */ + struct packet_index_time ts_cycles; /* timestamp in cycles */ + struct packet_index_time ts_real; /* realtime timestamp */ + /* CTF_INDEX 1.0 limit */ + uint64_t stream_instance_id; /* ID of the channel instance */ + uint64_t packet_seq_num; /* packet sequence number */ +}; + +struct bt_live_viewer_connection * + bt_live_viewer_connection_create(const char *url, FILE *error_fp); + +void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *conn); + +struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection); + +#endif /* LTTNG_LIVE_VIEWER_CONNECTION_H */ diff --git a/plugins/ctf/plugin.c b/plugins/ctf/plugin.c index 7b18bd10..63f7f8b3 100644 --- a/plugins/ctf/plugin.c +++ b/plugins/ctf/plugin.c @@ -49,14 +49,6 @@ BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_INIT_METHOD(fs, BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_FINALIZE_METHOD(fs, ctf_fs_iterator_finalize); -/* ctf.lttng-live source */ -BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(auto, lttng_live, "lttng-live", - lttng_live_iterator_next); -BT_PLUGIN_SOURCE_COMPONENT_CLASS_INIT_METHOD_WITH_ID(auto, lttng_live, - lttng_live_init); -BT_PLUGIN_SOURCE_COMPONENT_CLASS_DESCRIPTION_WITH_ID(auto, lttng_live, - "Connect to an LTTng relay daemon and receive CTF streams."); - /* ctf.fs sink */ BT_PLUGIN_SINK_COMPONENT_CLASS(fs, writer_run); BT_PLUGIN_SINK_COMPONENT_CLASS_INIT_METHOD(fs, writer_component_init); @@ -64,3 +56,19 @@ BT_PLUGIN_SINK_COMPONENT_CLASS_PORT_CONNECTED_METHOD(fs, writer_component_port_connected); BT_PLUGIN_SINK_COMPONENT_CLASS_FINALIZE_METHOD(fs, writer_component_finalize); BT_PLUGIN_SINK_COMPONENT_CLASS_DESCRIPTION(fs, "Write CTF traces to the file system."); + +/* ctf.lttng-live source */ +BT_PLUGIN_SOURCE_COMPONENT_CLASS_WITH_ID(auto, lttng_live, "lttng-live", + lttng_live_iterator_next); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_DESCRIPTION_WITH_ID(auto, lttng_live, + "Connect to an LTTng relay daemon and receive CTF streams."); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_INIT_METHOD_WITH_ID(auto, lttng_live, + lttng_live_component_init); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_QUERY_METHOD_WITH_ID(auto, lttng_live, + lttng_live_query); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_FINALIZE_METHOD_WITH_ID(auto, lttng_live, + lttng_live_component_finalize); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_INIT_METHOD_WITH_ID( + auto, lttng_live, lttng_live_iterator_init); +BT_PLUGIN_SOURCE_COMPONENT_CLASS_NOTIFICATION_ITERATOR_FINALIZE_METHOD_WITH_ID( + auto, lttng_live, lttng_live_iterator_finalize); diff --git a/plugins/ctf/print.h b/plugins/ctf/print.h index eb5b30c8..eab99d77 100644 --- a/plugins/ctf/print.h +++ b/plugins/ctf/print.h @@ -32,7 +32,7 @@ do { \ if (PRINT_ERR_STREAM) { \ fprintf(PRINT_ERR_STREAM, \ - "Error: " PRINT_PREFIX ": " fmt, \ + "[error " PRINT_PREFIX "] " fmt, \ ##__VA_ARGS__); \ } \ } while (0) @@ -41,7 +41,7 @@ do { \ if (PRINT_ERR_STREAM) { \ fprintf(PRINT_ERR_STREAM, \ - "Warning: " PRINT_PREFIX ": " fmt, \ + "[warning " PRINT_PREFIX "] " fmt, \ ##__VA_ARGS__); \ } \ } while (0) @@ -50,7 +50,7 @@ do { \ if (PRINT_DBG_CHECK) { \ fprintf(stderr, \ - "Debug: " PRINT_PREFIX ": " fmt, \ + "[debug " PRINT_PREFIX "] " fmt, \ ##__VA_ARGS__); \ } \ } while (0) diff --git a/plugins/text/pretty/pretty.c b/plugins/text/pretty/pretty.c index ece1f1e6..d7346de1 100644 --- a/plugins/text/pretty/pretty.c +++ b/plugins/text/pretty/pretty.c @@ -130,8 +130,20 @@ enum bt_component_status handle_notification(struct pretty_component *pretty, assert(pretty); - if (bt_notification_get_type(notification) == BT_NOTIFICATION_TYPE_EVENT) { + switch (bt_notification_get_type(notification)) { + case BT_NOTIFICATION_TYPE_EVENT: ret = pretty_print_event(pretty, notification); + break; + case BT_NOTIFICATION_TYPE_INACTIVITY: + fprintf(stderr, "Inactivity notification\n"); + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + case BT_NOTIFICATION_TYPE_PACKET_END: + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + case BT_NOTIFICATION_TYPE_STREAM_END: + break; + default: + fprintf(stderr, "Unhandled notification type\n"); } return ret; @@ -185,9 +197,6 @@ enum bt_component_status pretty_consume(struct bt_private_component *component) it_ret = bt_notification_iterator_next(it); switch (it_ret) { - case BT_NOTIFICATION_ITERATOR_STATUS_ERROR: - ret = BT_COMPONENT_STATUS_ERROR; - goto end; case BT_NOTIFICATION_ITERATOR_STATUS_END: ret = BT_COMPONENT_STATUS_END; BT_PUT(pretty->input_iterator);