X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fdata-stream.c;h=d617c302c2c3f8b4c76ce0b0e33a2f38d55e7e51;hb=3fadfbc0c91f82c46bd36e6e0657ea93570c9db1;hp=d70891089be92776486957f3b3ebb3ccda72c254;hpb=55314f2a6c08045f6f7ca7700a4932628eff1b87;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/data-stream.c b/plugins/ctf/lttng-live/data-stream.c index d7089108..d617c302 100644 --- a/plugins/ctf/lttng-live/data-stream.c +++ b/plugins/ctf/lttng-live/data-stream.c @@ -1,4 +1,5 @@ /* + * Copyright 2019 Francis Deslauriers * Copyright 2016 - Philippe Proulx * Copyright 2016 - Jérémie Galarneau * Copyright 2010-2011 - EfficiOS Inc. and Linux Foundation @@ -31,38 +32,39 @@ #include #include #include -#include -#include -#include "../common/notif-iter/notif-iter.h" -#include +#include +#include +#include "../common/msg-iter/msg-iter.h" +#include #include "data-stream.h" +#define STREAM_NAME_PREFIX "stream-" + static -enum bt_ctf_notif_iter_medium_status medop_request_bytes( +enum bt_msg_iter_medium_status medop_request_bytes( size_t request_sz, uint8_t **buffer_addr, size_t *buffer_sz, void *data) { - enum bt_ctf_notif_iter_medium_status status = - BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_msg_iter_medium_status status = + BT_MSG_ITER_MEDIUM_STATUS_OK; struct lttng_live_stream_iterator *stream = data; struct lttng_live_trace *trace = stream->trace; struct lttng_live_session *session = trace->session; - struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_msg_iter *live_msg_iter = session->lttng_live_msg_iter; uint64_t recv_len = 0; uint64_t len_left; uint64_t read_len; - //int i; len_left = stream->base_offset + stream->len - stream->offset; if (!len_left) { stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + status = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; return status; } read_len = MIN(request_sz, stream->buflen); read_len = MIN(read_len, len_left); - status = lttng_live_get_stream_bytes(lttng_live, + status = lttng_live_get_stream_bytes(live_msg_iter, stream, stream->buf, stream->offset, read_len, &recv_len); *buffer_addr = stream->buf; @@ -72,65 +74,97 @@ enum bt_ctf_notif_iter_medium_status medop_request_bytes( } static -struct bt_ctf_stream *medop_get_stream( - struct bt_ctf_stream_class *stream_class, void *data) +bt_stream *medop_borrow_stream(bt_stream_class *stream_class, + int64_t stream_id, void *data) { struct lttng_live_stream_iterator *lttng_live_stream = data; if (!lttng_live_stream->stream) { - int64_t id = bt_ctf_stream_class_get_id(stream_class); + uint64_t stream_class_id = bt_stream_class_get_id(stream_class); + + BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream " + "class %" PRId64, lttng_live_stream->name->str, + stream_id, stream_class_id); + + if (stream_id < 0) { + /* + * No stream instance ID in the stream. It's possible + * to encounter this situation with older version of + * LTTng. In these cases, use the viewer_stream_id that + * is unique for a live viewer session. + */ + lttng_live_stream->stream = bt_stream_create_with_id( + stream_class, lttng_live_stream->trace->trace, + lttng_live_stream->viewer_stream_id); + } else { + lttng_live_stream->stream = bt_stream_create_with_id( + stream_class, lttng_live_stream->trace->trace, + (uint64_t) stream_id); + } - BT_LOGD("Creating stream %s out of stream class %" PRId64, - lttng_live_stream->name, id); - lttng_live_stream->stream = bt_ctf_stream_create(stream_class, - lttng_live_stream->name); if (!lttng_live_stream->stream) { - BT_LOGE("Cannot create stream %s (stream class %" PRId64 ")", - lttng_live_stream->name, id); + BT_LOGE("Cannot create stream %s (stream class ID " + "%" PRId64 ", stream ID %" PRIu64 ")", + lttng_live_stream->name->str, + stream_class_id, stream_id); } + bt_stream_set_name(lttng_live_stream->stream, + lttng_live_stream->name->str); } return lttng_live_stream->stream; } -static struct bt_ctf_notif_iter_medium_ops medops = { +static struct bt_msg_iter_medium_ops medops = { .request_bytes = medop_request_bytes, - .get_stream = medop_get_stream, + .seek = NULL, + .borrow_stream = medop_borrow_stream, }; BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init( +enum lttng_live_iterator_status lttng_live_lazy_msg_init( struct lttng_live_session *session) { - struct lttng_live_component *lttng_live = session->lttng_live; - struct lttng_live_trace *trace; + struct lttng_live_component *lttng_live = + session->lttng_live_msg_iter->lttng_live_comp; + uint64_t trace_idx, stream_iter_idx; - if (!session->lazy_stream_notif_init) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + if (!session->lazy_stream_msg_init) { + return LTTNG_LIVE_ITERATOR_STATUS_OK; } - bt_list_for_each_entry(trace, &session->traces, node) { - struct lttng_live_stream_iterator *stream; + for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { + struct lttng_live_trace *trace = + g_ptr_array_index(session->traces, trace_idx); + + for (stream_iter_idx = 0; + stream_iter_idx < trace->stream_iterators->len; + stream_iter_idx++) { + struct ctf_trace_class *ctf_tc; + struct lttng_live_stream_iterator *stream_iter = + g_ptr_array_index(trace->stream_iterators, + stream_iter_idx); - bt_list_for_each_entry(stream, &trace->streams, node) { - if (stream->notif_iter) { + if (stream_iter->msg_iter) { continue; } - stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class( + trace->metadata->decoder); + stream_iter->msg_iter = bt_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, - stream); - if (!stream->notif_iter) { + stream_iter); + if (!stream_iter->msg_iter) { goto error; } } } - session->lazy_stream_notif_init = false; + session->lazy_stream_msg_init = false; - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; error: - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; } BT_HIDDEN @@ -139,78 +173,101 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( uint64_t ctf_trace_id, uint64_t stream_id) { - struct lttng_live_component *lttng_live = session->lttng_live; - struct lttng_live_stream_iterator *stream = - g_new0(struct lttng_live_stream_iterator, 1); + struct lttng_live_stream_iterator *stream_iter; + struct lttng_live_component *lttng_live; struct lttng_live_trace *trace; - int ret; - trace = lttng_live_ref_trace(session, ctf_trace_id); + BT_ASSERT(session); + BT_ASSERT(session->lttng_live_msg_iter); + BT_ASSERT(session->lttng_live_msg_iter->lttng_live_comp); + + lttng_live = session->lttng_live_msg_iter->lttng_live_comp; + + stream_iter = g_new0(struct lttng_live_stream_iterator, 1); + if (!stream_iter) { + goto error; + } + + trace = lttng_live_borrow_trace(session, ctf_trace_id); if (!trace) { goto error; } - stream->p.type = LIVE_STREAM_TYPE_STREAM; - stream->trace = trace; - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - stream->viewer_stream_id = stream_id; - stream->ctf_stream_class_id = -1ULL; - stream->last_returned_inactivity_timestamp = INT64_MIN; + stream_iter->trace = trace; + stream_iter->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + stream_iter->viewer_stream_id = stream_id; + stream_iter->ctf_stream_class_id = -1ULL; + stream_iter->last_inactivity_ts = INT64_MIN; if (trace->trace) { - stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + struct ctf_trace_class *ctf_tc = + ctf_metadata_decoder_borrow_ctf_trace_class( + trace->metadata->decoder); + BT_ASSERT(!stream_iter->msg_iter); + stream_iter->msg_iter = bt_msg_iter_create(ctf_tc, lttng_live->max_query_size, medops, - stream); - if (!stream->notif_iter) { + stream_iter); + if (!stream_iter->msg_iter) { goto error; } } - stream->buf = g_new0(uint8_t, session->lttng_live->max_query_size); - stream->buflen = session->lttng_live->max_query_size; + stream_iter->buf = g_new0(uint8_t, lttng_live->max_query_size); + if (!stream_iter->buf) { + goto error; + } + + stream_iter->buflen = lttng_live->max_query_size; + stream_iter->name = g_string_new(NULL); + if (!stream_iter->name) { + goto error; + } - ret = lttng_live_add_port(lttng_live, stream); - assert(!ret); + g_string_printf(stream_iter->name, STREAM_NAME_PREFIX "%" PRIu64, + stream_iter->viewer_stream_id); + g_ptr_array_add(trace->stream_iterators, stream_iter); - bt_list_add(&stream->node, &trace->streams); + /* Track the number of active stream iterator. */ + session->lttng_live_msg_iter->active_stream_iter++; goto end; error: - /* Do not touch "borrowed" file. */ - lttng_live_stream_iterator_destroy(stream); - stream = NULL; + lttng_live_stream_iterator_destroy(stream_iter); + stream_iter = NULL; end: - return stream; + return stream_iter; } BT_HIDDEN -void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *stream) +void lttng_live_stream_iterator_destroy( + struct lttng_live_stream_iterator *stream_iter) { - struct lttng_live_component *lttng_live; - int ret; - - if (!stream) { + if (!stream_iter) { return; } - lttng_live = stream->trace->session->lttng_live; - ret = lttng_live_remove_port(lttng_live, stream->port); - assert(!ret); - - if (stream->stream) { - BT_PUT(stream->stream); + if (stream_iter->stream) { + BT_STREAM_PUT_REF_AND_RESET(stream_iter->stream); } - if (stream->notif_iter) { - bt_ctf_notif_iter_destroy(stream->notif_iter); + if (stream_iter->msg_iter) { + bt_msg_iter_destroy(stream_iter->msg_iter); + } + if (stream_iter->buf) { + g_free(stream_iter->buf); + } + if (stream_iter->name) { + g_string_free(stream_iter->name, TRUE); } - g_free(stream->buf); - BT_PUT(stream->packet_end_notif_queue); - bt_list_del(&stream->node); + + bt_message_put_ref(stream_iter->current_msg); + + /* Track the number of active stream iterator. */ + stream_iter->trace->session->lttng_live_msg_iter->active_stream_iter--; + /* * Ensure we poke the trace metadata in the future, which is * required to release the metadata reference on the trace. */ - stream->trace->new_metadata_needed = true; - lttng_live_unref_trace(stream->trace); - g_free(stream); + stream_iter->trace->new_metadata_needed = true; + g_free(stream_iter); }