X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fdata-stream.c;h=9d093cb58b7c8940b1bde0815a2c8ac56e0c7563;hp=fde80b64c208b15f8d4f1bc552f1e3e0d5057c06;hb=bb18709be19ebc5b1bd9264cdbd3dd20939bdd05;hpb=1e3400b8572abdb55c6f0416e7c47d13fb715037 diff --git a/plugins/ctf/lttng-live/data-stream.c b/plugins/ctf/lttng-live/data-stream.c index fde80b64..9d093cb5 100644 --- a/plugins/ctf/lttng-live/data-stream.c +++ b/plugins/ctf/lttng-live/data-stream.c @@ -1,4 +1,5 @@ /* + * Copyright 2019 Francis Deslauriers * Copyright 2016 - Philippe Proulx * Copyright 2016 - Jérémie Galarneau * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation @@ -38,6 +39,8 @@ #include "data-stream.h" +#define STREAM_NAME_PREFIX "stream-" + static enum bt_msg_iter_medium_status medop_request_bytes( size_t request_sz, uint8_t **buffer_addr, @@ -48,11 +51,10 @@ enum bt_msg_iter_medium_status medop_request_bytes( struct lttng_live_stream_iterator *stream = data; struct lttng_live_trace *trace = stream->trace; struct lttng_live_session *session = trace->session; - struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter; uint64_t recv_len = 0; uint64_t len_left; uint64_t read_len; - //int i; len_left = stream->base_offset + stream->len - stream->offset; if (!len_left) { @@ -62,7 +64,7 @@ enum bt_msg_iter_medium_status medop_request_bytes( } read_len = MIN(request_sz, stream->buflen); read_len = MIN(read_len, len_left); - status = lttng_live_get_stream_bytes(lttng_live, + status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf, stream->offset, read_len, &recv_len); *buffer_addr = stream->buf; @@ -72,34 +74,42 @@ enum bt_msg_iter_medium_status medop_request_bytes( } static -const bt_stream *medop_get_stream( - const bt_stream_class *stream_class, - uint64_t stream_id, void *data) +bt_stream *medop_borrow_stream(bt_stream_class *stream_class, + int64_t stream_id, void *data) { struct lttng_live_stream_iterator *lttng_live_stream = data; if (!lttng_live_stream->stream) { - int64_t stream_class_id = - bt_stream_class_get_id(stream_class); + uint64_t stream_class_id = bt_stream_class_get_id(stream_class); - BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream class %" PRId64, - lttng_live_stream->name, stream_id, stream_class_id); + BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream " + "class %" PRId64, lttng_live_stream->name->str, + stream_id, stream_class_id); - if (stream_id == -1ULL) { - /* No stream ID */ - lttng_live_stream->stream = bt_stream_create( - stream_class, lttng_live_stream->name); + if (stream_id < 0) { + /* + * No stream instance ID in the stream. It's possible + * to encounter this situation with older version of + * LTTng. In these cases, use the viewer_stream_id that + * is unique for a live viewer session. + */ + lttng_live_stream->stream = bt_stream_create_with_id( + stream_class, lttng_live_stream->trace->trace, + lttng_live_stream->viewer_stream_id); } else { - lttng_live_stream->stream = - bt_stream_create_with_id(stream_class, - lttng_live_stream->name, stream_id); + lttng_live_stream->stream = bt_stream_create_with_id( + stream_class, lttng_live_stream->trace->trace, + (uint64_t) stream_id); } if (!lttng_live_stream->stream) { - BT_LOGE("Cannot create stream %s (stream class %" PRId64 ", stream ID %" PRIu64 ")", - lttng_live_stream->name, - stream_class_id, stream_id); + BT_LOGE("Cannot create stream %s (stream class ID " + "%" PRId64 ", stream ID %" PRIu64 ")", + lttng_live_stream->name->str, + stream_class_id, stream_id); } + bt_stream_set_name(lttng_live_stream->stream, + lttng_live_stream->name->str); } return lttng_live_stream->stream; @@ -107,31 +117,43 @@ const bt_stream *medop_get_stream( static struct bt_msg_iter_medium_ops medops = { .request_bytes = medop_request_bytes, - .get_stream = medop_get_stream, + .seek = NULL, + .borrow_stream = medop_borrow_stream, }; BT_HIDDEN -bt_lttng_live_iterator_status lttng_live_lazy_msg_init( +enum lttng_live_iterator_status lttng_live_lazy_msg_init( struct lttng_live_session *session) { - struct lttng_live_component *lttng_live = session->lttng_live; - struct lttng_live_trace *trace; + struct lttng_live_component *lttng_live = + session->lttng_live_msg_iter->lttng_live_comp; + uint64_t trace_idx, stream_iter_idx; if (!session->lazy_stream_msg_init) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } - bt_list_for_each_entry(trace, &session->traces, node) { - struct lttng_live_stream_iterator *stream; + for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); + + for (stream_iter_idx = 0; + stream_iter_idx < trace->stream_iterators->len; + stream_iter_idx++) { + struct ctf_trace_class *ctf_tc; + struct lttng_live_stream_iterator *stream_iter = + g_ptr_array_index(trace->stream_iterators, + stream_iter_idx); - bt_list_for_each_entry(stream, &trace->streams, node) { - if (stream->msg_iter) { + if (stream_iter->msg_iter) { continue; } - stream->msg_iter = bt_msg_iter_create(trace->trace, + ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class( + trace->metadata->decoder); + stream_iter->msg_iter = bt_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, - stream); - if (!stream->msg_iter) { + stream_iter); + if (!stream_iter->msg_iter) { goto error; } } @@ -139,10 +161,10 @@ bt_lttng_live_iterator_status lttng_live_lazy_msg_init( session->lazy_stream_msg_init = false; - return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; error: - return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } BT_HIDDEN @@ -151,78 +173,101 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( uint64_t ctf_trace_id, uint64_t stream_id) { - struct lttng_live_component *lttng_live = session->lttng_live; - struct lttng_live_stream_iterator *stream = - g_new0(struct lttng_live_stream_iterator, 1); + struct lttng_live_stream_iterator *stream_iter; + struct lttng_live_component *lttng_live; struct lttng_live_trace *trace; - int ret; - trace = lttng_live_ref_trace(session, ctf_trace_id); + BT_ASSERT(session); + BT_ASSERT(session->lttng_live_msg_iter); + BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp); + + lttng_live = session->lttng_live_msg_iter->lttng_live_comp; + + stream_iter = g_new0(struct lttng_live_stream_iterator, 1); + if (!stream_iter) { + goto error; + } + + trace = lttng_live_borrow_trace(session, ctf_trace_id); if (!trace) { goto error; } - stream->p.type = LIVE_STREAM_TYPE_STREAM; - stream->trace = trace; - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - stream->viewer_stream_id = stream_id; - stream->ctf_stream_class_id = -1ULL; - stream->last_returned_inactivity_timestamp = INT64_MIN; + stream_iter->trace = trace; + stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + stream_iter->viewer_stream_id = stream_id; + stream_iter->ctf_stream_class_id = -1ULL; + stream_iter->last_inactivity_ts = INT64_MIN; if (trace->trace) { - stream->msg_iter = bt_msg_iter_create(trace->trace, + struct ctf_trace_class *ctf_tc = + ctf_metadata_decoder_borrow_ctf_trace_class( + trace->metadata->decoder); + BT_ASSERT(!stream_iter->msg_iter); + stream_iter->msg_iter = bt_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, - stream); - if (!stream->msg_iter) { + stream_iter); + if (!stream_iter->msg_iter) { goto error; } } - stream->buf = g_new0(uint8_t, session->lttng_live->max_query_size); - stream->buflen = session->lttng_live->max_query_size; + stream_iter->buf = g_new0(uint8_t, lttng_live->max_query_size); + if (!stream_iter->buf) { + goto error; + } - ret = lttng_live_add_port(lttng_live, stream); - BT_ASSERT(!ret); + stream_iter->buflen = lttng_live->max_query_size; + stream_iter->name = g_string_new(NULL); + if (!stream_iter->name) { + goto error; + } + + g_string_printf(stream_iter->name, STREAM_NAME_PREFIX "%" PRIu64, + stream_iter->viewer_stream_id); + g_ptr_array_add(trace->stream_iterators, stream_iter); - bt_list_add(&stream->node, &trace->streams); + /* Track the number of active stream iterator. */ + session->lttng_live_msg_iter->active_stream_iter++; goto end; error: - /* Do not touch "borrowed" file. */ - lttng_live_stream_iterator_destroy(stream); - stream = NULL; + lttng_live_stream_iterator_destroy(stream_iter); + stream_iter = NULL; end: - return stream; + return stream_iter; } BT_HIDDEN -void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream) +void lttng_live_stream_iterator_destroy( + struct lttng_live_stream_iterator *stream_iter) { - struct lttng_live_component *lttng_live; - int ret; - - if (!stream) { + if (!stream_iter) { return; } - lttng_live = stream->trace->session->lttng_live; - ret = lttng_live_remove_port(lttng_live, stream->port); - BT_ASSERT(!ret); - - if (stream->stream) { - BT_OBJECT_PUT_REF_AND_RESET(stream->stream); + if (stream_iter->stream) { + BT_STREAM_PUT_REF_AND_RESET(stream_iter->stream); } - if (stream->msg_iter) { - bt_msg_iter_destroy(stream->msg_iter); + if (stream_iter->msg_iter) { + bt_msg_iter_destroy(stream_iter->msg_iter); } - g_free(stream->buf); - BT_OBJECT_PUT_REF_AND_RESET(stream->packet_end_msg_queue); - bt_list_del(&stream->node); + if (stream_iter->buf) { + g_free(stream_iter->buf); + } + if (stream_iter->name) { + g_string_free(stream_iter->name, TRUE); + } + + bt_message_put_ref(stream_iter->current_msg); + + /* Track the number of active stream iterator. */ + stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--; + /* * Ensure we poke the trace metadata in the future, which is * required to release the metadata reference on the trace. */ - stream->trace->new_metadata_needed = true; - lttng_live_unref_trace(stream->trace); - g_free(stream); + stream_iter->trace->new_metadata_needed = true; + g_free(stream_iter); }