X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fdata-stream.c;h=54eb9dfc626a308ebf55ce91a74687e135597e28;hb=d6e69534ef08a2dd8bff9eb5af1eab63736b3d31;hp=6412e3d0a6b8998f70f0e9b85114d87000eb8ea5;hpb=7cdc2bab17acd56d035b204518ef845fa5a9f1c7;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/data-stream.c b/plugins/ctf/lttng-live/data-stream.c index 6412e3d0..54eb9dfc 100644 --- a/plugins/ctf/lttng-live/data-stream.c +++ b/plugins/ctf/lttng-live/data-stream.c @@ -22,30 +22,29 @@ * SOFTWARE. */ +#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-DS" +#include "logging.h" + #include #include #include #include #include #include -#include -#include -#include "../common/notif-iter/notif-iter.h" -#include -#include "data-stream.h" +#include +#include +#include "../common/msg-iter/msg-iter.h" +#include -#define PRINT_ERR_STREAM lttng_live->error_fp -#define PRINT_PREFIX "lttng-live-data-stream" -#define PRINT_DBG_CHECK lttng_live_debug -#include "../print.h" +#include "data-stream.h" static -enum bt_ctf_notif_iter_medium_status medop_request_bytes( +enum bt_msg_iter_medium_status medop_request_bytes( size_t request_sz, uint8_t **buffer_addr, size_t *buffer_sz, void *data) { - enum bt_ctf_notif_iter_medium_status status = - BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_msg_iter_medium_status status = + BT_MSG_ITER_MEDIUM_STATUS_OK; struct lttng_live_stream_iterator *stream = data; struct lttng_live_trace *trace = stream->trace; struct lttng_live_session *session = trace->session; @@ -58,7 +57,7 @@ enum bt_ctf_notif_iter_medium_status medop_request_bytes( len_left = stream->base_offset + stream->len - stream->offset; if (!len_left) { stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + status = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; return status; } read_len = MIN(request_sz, stream->buflen); @@ -66,12 +65,6 @@ enum bt_ctf_notif_iter_medium_status medop_request_bytes( status = lttng_live_get_stream_bytes(lttng_live, stream, stream->buf, stream->offset, read_len, &recv_len); -#if 0 //DEBUG - for (i = 0; i < recv_len; i++) { - fprintf(stderr, "%x ", stream->buf[i]); - } - fprintf(stderr, "\n"); -#endif *buffer_addr = stream->buf; *buffer_sz = recv_len; stream->offset += recv_len; @@ -79,68 +72,77 @@ enum bt_ctf_notif_iter_medium_status medop_request_bytes( } static -struct bt_ctf_stream *medop_get_stream( - struct bt_ctf_stream_class *stream_class, void *data) +const bt_stream *medop_get_stream( + const bt_stream_class *stream_class, + uint64_t stream_id, void *data) { struct lttng_live_stream_iterator *lttng_live_stream = data; - struct lttng_live_trace *trace = lttng_live_stream->trace; - struct lttng_live_session *session = trace->session; - struct lttng_live_component *lttng_live = session->lttng_live; if (!lttng_live_stream->stream) { - int64_t id = bt_ctf_stream_class_get_id(stream_class); + int64_t stream_class_id = + bt_stream_class_get_id(stream_class); + + BT_LOGD("Creating stream %s (ID: %" PRIu64 ") out of stream class %" PRId64, + lttng_live_stream->name, stream_id, stream_class_id); + + if (stream_id == -1ULL) { + /* No stream ID */ + lttng_live_stream->stream = bt_stream_create( + stream_class, lttng_live_stream->name); + } else { + lttng_live_stream->stream = + bt_stream_create_with_id(stream_class, + lttng_live_stream->name, stream_id); + } - PDBG("Creating stream %s out of stream class %" PRId64 "\n", - lttng_live_stream->name, id); - lttng_live_stream->stream = bt_ctf_stream_create(stream_class, - lttng_live_stream->name); if (!lttng_live_stream->stream) { - PERR("Cannot create stream %s (stream class %" PRId64 ")\n", - lttng_live_stream->name, id); + BT_LOGE("Cannot create stream %s (stream class %" PRId64 ", stream ID %" PRIu64 ")", + lttng_live_stream->name, + stream_class_id, stream_id); } } return lttng_live_stream->stream; } -static struct bt_ctf_notif_iter_medium_ops medops = { +static struct bt_msg_iter_medium_ops medops = { .request_bytes = medop_request_bytes, .get_stream = medop_get_stream, }; BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_lazy_notif_init( +enum bt_lttng_live_iterator_status lttng_live_lazy_msg_init( struct lttng_live_session *session) { struct lttng_live_component *lttng_live = session->lttng_live; struct lttng_live_trace *trace; - if (!session->lazy_stream_notif_init) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + if (!session->lazy_stream_msg_init) { + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } bt_list_for_each_entry(trace, &session->traces, node) { struct lttng_live_stream_iterator *stream; bt_list_for_each_entry(stream, &trace->streams, node) { - if (stream->notif_iter) { + if (stream->msg_iter) { continue; } - stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + stream->msg_iter = bt_msg_iter_create(trace->trace, lttng_live->max_query_size, medops, - stream, lttng_live->error_fp); - if (!stream->notif_iter) { + stream); + if (!stream->msg_iter) { goto error; } } } - session->lazy_stream_notif_init = false; + session->lazy_stream_msg_init = false; - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; error: - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } BT_HIDDEN @@ -168,10 +170,10 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( stream->last_returned_inactivity_timestamp = INT64_MIN; if (trace->trace) { - stream->notif_iter = bt_ctf_notif_iter_create(trace->trace, + stream->msg_iter = bt_msg_iter_create(trace->trace, lttng_live->max_query_size, medops, - stream, lttng_live->error_fp); - if (!stream->notif_iter) { + stream); + if (!stream->msg_iter) { goto error; } } @@ -179,7 +181,7 @@ struct lttng_live_stream_iterator *lttng_live_stream_iterator_create( stream->buflen = session->lttng_live->max_query_size; ret = lttng_live_add_port(lttng_live, stream); - assert(!ret); + BT_ASSERT(!ret); bt_list_add(&stream->node, &trace->streams); @@ -204,17 +206,17 @@ void lttng_live_stream_iterator_destroy(struct lttng_live_stream_iterator *strea lttng_live = stream->trace->session->lttng_live; ret = lttng_live_remove_port(lttng_live, stream->port); - assert(!ret); + BT_ASSERT(!ret); if (stream->stream) { - BT_PUT(stream->stream); + BT_OBJECT_PUT_REF_AND_RESET(stream->stream); } - if (stream->notif_iter) { - bt_ctf_notif_iter_destroy(stream->notif_iter); + if (stream->msg_iter) { + bt_msg_iter_destroy(stream->msg_iter); } g_free(stream->buf); - BT_PUT(stream->packet_end_notif_queue); + BT_OBJECT_PUT_REF_AND_RESET(stream->packet_end_msg_queue); bt_list_del(&stream->node); /* * Ensure we poke the trace metadata in the future, which is