X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Flttng-live.c;h=d8dd3fc79f5dab85e6724958aac98822d56ebd97;hb=40f4ba76dd6f9508ca51b6220eaed57632281a07;hp=4831c81af77d142b9b78cfc01b577bd5f3d8a048;hpb=147337a3be96c8ea69fee38099762370ecac8d51;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/lttng-live.c b/plugins/ctf/lttng-live/lttng-live.c index 4831c81a..d8dd3fc7 100644 --- a/plugins/ctf/lttng-live/lttng-live.c +++ b/plugins/ctf/lttng-live/lttng-live.c @@ -30,24 +30,12 @@ #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC" #include "logging.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include +#include #include #include -#include +#include #include #include @@ -77,11 +65,38 @@ static const char *print_state(struct lttng_live_stream_iterator *s) } } -#define print_stream_state(stream) \ - print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \ - bt_port_get_name(bt_port_from_private_port(stream->port)), \ - print_state(stream), stream->last_returned_inactivity_timestamp, \ - stream->current_inactivity_timestamp) +static +void print_stream_state(struct lttng_live_stream_iterator *stream) +{ + struct bt_port *port; + + port = bt_port_from_private(stream->port); + print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, + bt_port_get_name(port), + print_state(stream), + stream->last_returned_inactivity_timestamp, + stream->current_inactivity_timestamp); + bt_object_put_ref(port); +} + +BT_HIDDEN +bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live) +{ + struct bt_component *component; + struct bt_graph *graph; + bt_bool ret; + + if (!lttng_live) { + return BT_FALSE; + } + + component = bt_component_from_private(lttng_live->private_component); + graph = bt_component_get_graph(component); + ret = bt_private_graph_is_canceled(graph); + bt_object_put_ref(graph); + bt_object_put_ref(component); + return ret; +} BT_HIDDEN int lttng_live_add_port(struct lttng_live_component *lttng_live, @@ -90,24 +105,36 @@ int lttng_live_add_port(struct lttng_live_component *lttng_live, int ret; struct bt_private_port *private_port; char name[STREAM_NAME_MAX_LEN]; + enum bt_component_status status; ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id); - assert(ret > 0); + BT_ASSERT(ret > 0); strcpy(stream_iter->name, name); - ret = bt_private_component_source_add_output_private_port( + if (lttng_live_is_canceled(lttng_live)) { + return 0; + } + status = bt_self_component_source_add_output_port( lttng_live->private_component, name, stream_iter, &private_port); - if (ret) { + switch (status) { + case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: + return 0; + case BT_COMPONENT_STATUS_OK: + break; + default: return -1; } + bt_object_put_ref(private_port); /* weak */ BT_LOGI("Added port %s", name); if (lttng_live->no_stream_port) { + bt_object_get_ref(lttng_live->no_stream_port); ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); + bt_object_put_ref(lttng_live->no_stream_port); if (ret) { return -1; } - BT_PUT(lttng_live->no_stream_port); + lttng_live->no_stream_port = NULL; lttng_live->no_stream_iter->port = NULL; } stream_iter->port = private_port; @@ -122,23 +149,37 @@ int lttng_live_remove_port(struct lttng_live_component *lttng_live, int64_t nr_ports; int ret; - component = bt_component_from_private_component(lttng_live->private_component); + component = bt_component_from_private(lttng_live->private_component); nr_ports = bt_component_source_get_output_port_count(component); if (nr_ports < 0) { return -1; } - BT_PUT(component); + BT_OBJECT_PUT_REF_AND_RESET(component); if (nr_ports == 1) { - assert(!lttng_live->no_stream_port); - ret = bt_private_component_source_add_output_private_port(lttng_live->private_component, + enum bt_component_status status; + + BT_ASSERT(!lttng_live->no_stream_port); + + if (lttng_live_is_canceled(lttng_live)) { + return 0; + } + status = bt_self_component_source_add_output_port(lttng_live->private_component, "no-stream", lttng_live->no_stream_iter, <tng_live->no_stream_port); - if (ret) { + switch (status) { + case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: + return 0; + case BT_COMPONENT_STATUS_OK: + break; + default: return -1; } + bt_object_put_ref(lttng_live->no_stream_port); /* weak */ lttng_live->no_stream_iter->port = lttng_live->no_stream_port; } + bt_object_get_ref(port); ret = bt_private_port_remove_from_component(port); + bt_object_put_ref(port); if (ret) { return -1; } @@ -163,17 +204,20 @@ static void lttng_live_destroy_trace(struct bt_object *obj) { struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj); - int retval; BT_LOGI("Destroy trace"); - assert(bt_list_empty(&trace->streams)); + BT_ASSERT(bt_list_empty(&trace->streams)); bt_list_del(&trace->node); - retval = bt_ctf_trace_set_is_static(trace->trace); - assert(!retval); + if (trace->trace) { + int retval; + retval = bt_trace_set_is_static(trace->trace); + BT_ASSERT(!retval); + BT_OBJECT_PUT_REF_AND_RESET(trace->trace); + } lttng_live_metadata_fini(trace); - BT_PUT(trace->cc_prio_map); + BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map); g_free(trace); } @@ -210,7 +254,7 @@ struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session trace = lttng_live_find_trace(session, trace_id); if (trace) { - bt_get(trace); + bt_object_get_ref(trace); return trace; } return lttng_live_create_trace(session, trace_id); @@ -219,7 +263,7 @@ struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session BT_HIDDEN void lttng_live_unref_trace(struct lttng_live_trace *trace) { - bt_put(trace); + bt_object_put_ref(trace); } static @@ -273,7 +317,7 @@ void lttng_live_destroy_session(struct lttng_live_session *session) BT_LOGI("Destroy session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - if (!bt_graph_is_canceled(session->lttng_live->graph)) { + if (!lttng_live_is_canceled(session->lttng_live)) { /* Old relayd cannot detach sessions. */ BT_LOGD("Unable to detach session %" PRIu64, session->id); @@ -295,10 +339,10 @@ void lttng_live_destroy_session(struct lttng_live_session *session) } BT_HIDDEN -void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it) +void lttng_live_iterator_finalize(struct bt_self_notification_iterator *it) { struct lttng_live_stream_iterator_generic *s = - bt_private_notification_iterator_get_user_data(it); + bt_self_notification_iterator_get_user_data(it); switch (s->type) { case LIVE_STREAM_TYPE_NO_STREAM: @@ -318,7 +362,7 @@ void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it) } static -enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( +enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *lttng_live_stream) { @@ -337,7 +381,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_sta case LTTNG_LIVE_STREAM_EOF: break; } - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } /* @@ -349,21 +393,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_sta * return EOF. */ static -enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( +enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *lttng_live_stream) { - enum bt_ctf_lttng_live_iterator_status ret = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status ret = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct packet_index index; enum lttng_live_stream_state orig_state = lttng_live_stream->state; if (lttng_live_stream->trace->new_metadata_needed) { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } if (lttng_live_stream->trace->session->new_streams_needed) { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA @@ -371,18 +415,18 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da goto end; } ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index); - if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } - assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); + BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && lttng_live_stream->last_returned_inactivity_timestamp == lttng_live_stream->current_inactivity_timestamp) { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; print_stream_state(lttng_live_stream); } else { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } goto end; } @@ -390,7 +434,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_da lttng_live_stream->offset = index.offset; lttng_live_stream->len = index.packet_size / CHAR_BIT; end: - if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { ret = lttng_live_iterator_next_check_stream_state( lttng_live, lttng_live_stream); } @@ -405,29 +449,29 @@ end: * per-stream notifications. */ static -enum bt_ctf_lttng_live_iterator_status lttng_live_get_session( +enum bt_lttng_live_iterator_status lttng_live_get_session( struct lttng_live_component *lttng_live, struct lttng_live_session *session) { - enum bt_ctf_lttng_live_iterator_status status; + enum bt_lttng_live_iterator_status status; struct lttng_live_trace *trace, *t; if (lttng_live_attach_session(session)) { - if (bt_graph_is_canceled(lttng_live->graph)) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + if (lttng_live_is_canceled(lttng_live)) { + return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } } status = lttng_live_get_new_streams(session); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && - status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) { + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && + status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { return status; } bt_list_for_each_entry_safe(trace, t, &session->traces, node) { status = lttng_live_metadata_update(trace); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && - status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) { + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && + status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { return status; } } @@ -460,11 +504,11 @@ void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttn } static -enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( +enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( struct lttng_live_component *lttng_live) { - enum bt_ctf_lttng_live_iterator_status ret = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status ret = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; unsigned int nr_sessions_opened = 0; struct lttng_live_session *session, *s; @@ -483,16 +527,16 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_strea * currently ongoing. */ if (bt_list_empty(<tng_live->sessions)) { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; goto end; } bt_list_for_each_entry(session, <tng_live->sessions, node) { ret = lttng_live_get_session(lttng_live, session); switch (ret) { - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: + case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + case BT_LTTNG_LIVE_ITERATOR_STATUS_END: + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; break; default: goto end; @@ -502,24 +546,24 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_strea } } end: - if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) { - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) { + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; } return ret; } static -enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification( +enum bt_lttng_live_iterator_status emit_inactivity_notification( struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *lttng_live_stream, struct bt_notification **notification, uint64_t timestamp) { - enum bt_ctf_lttng_live_iterator_status ret = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status ret = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_live_trace *trace; - struct bt_ctf_clock_class *clock_class = NULL; - struct bt_ctf_clock_value *clock_value = NULL; + const struct bt_clock_class *clock_class = NULL; + struct bt_clock_value *clock_value = NULL; struct bt_notification *notif = NULL; int retval; @@ -531,7 +575,7 @@ enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification( if (!clock_class) { goto error; } - clock_value = bt_ctf_clock_value_create(clock_class, timestamp); + clock_value = bt_clock_value_create(clock_class, timestamp); if (!clock_value) { goto error; } @@ -545,35 +589,35 @@ enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification( } *notification = notif; end: - bt_put(clock_value); - bt_put(clock_class); + bt_object_put_ref(clock_value); + bt_object_put_ref(clock_class); return ret; error: - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; - bt_put(notif); + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + bt_object_put_ref(notif); goto end; } static -enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( +enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *lttng_live_stream, struct bt_notification **notification) { - enum bt_ctf_lttng_live_iterator_status ret = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; - struct bt_ctf_clock_class *clock_class = NULL; - struct bt_ctf_clock_value *clock_value = NULL; + enum bt_lttng_live_iterator_status ret = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + const struct bt_clock_class *clock_class = NULL; + struct bt_clock_value *clock_value = NULL; if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } if (lttng_live_stream->current_inactivity_timestamp == lttng_live_stream->last_returned_inactivity_timestamp) { lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -583,48 +627,48 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quies lttng_live_stream->last_returned_inactivity_timestamp = lttng_live_stream->current_inactivity_timestamp; end: - bt_put(clock_value); - bt_put(clock_class); + bt_object_put_ref(clock_value); + bt_object_put_ref(clock_class); return ret; } static -enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( +enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *lttng_live_stream, struct bt_notification **notification) { - enum bt_ctf_lttng_live_iterator_status ret = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; - enum bt_ctf_notif_iter_status status; + enum bt_lttng_live_iterator_status ret = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_notif_iter_status status; struct lttng_live_session *session; bt_list_for_each_entry(session, <tng_live->sessions, node) { struct lttng_live_trace *trace; if (session->new_streams_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } bt_list_for_each_entry(trace, &session->traces, node) { if (trace->new_metadata_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; } } } if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } if (lttng_live_stream->packet_end_notif_queue) { *notification = lttng_live_stream->packet_end_notif_queue; lttng_live_stream->packet_end_notif_queue = NULL; - status = BT_CTF_NOTIF_ITER_STATUS_OK; + status = BT_NOTIF_ITER_STATUS_OK; } else { - status = bt_ctf_notif_iter_get_next_notification( + status = bt_notif_iter_get_next_notification( lttng_live_stream->notif_iter, lttng_live_stream->trace->cc_prio_map, notification); - if (status == BT_CTF_NOTIF_ITER_STATUS_OK) { + if (status == BT_NOTIF_ITER_STATUS_OK) { /* * Consider empty packets as inactivity. */ @@ -638,25 +682,25 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ } } switch (status) { - case BT_CTF_NOTIF_ITER_STATUS_EOF: - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + case BT_NOTIF_ITER_STATUS_EOF: + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; break; - case BT_CTF_NOTIF_ITER_STATUS_OK: - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + case BT_NOTIF_ITER_STATUS_OK: + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; break; - case BT_CTF_NOTIF_ITER_STATUS_AGAIN: + case BT_NOTIF_ITER_STATUS_AGAIN: /* * Continue immediately (end of packet). The next * get_index may return AGAIN to delay the following * attempt. */ - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; - case BT_CTF_NOTIF_ITER_STATUS_INVAL: + case BT_NOTIF_ITER_STATUS_INVAL: /* No argument provided by the user, so don't return INVAL. */ - case BT_CTF_NOTIF_ITER_STATUS_ERROR: + case BT_NOTIF_ITER_STATUS_ERROR: default: - ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; break; } return ret; @@ -711,12 +755,12 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ * When disconnected from relayd: try to re-connect endlessly. */ static -struct bt_notification_iterator_next_return lttng_live_iterator_next_stream( - struct bt_private_notification_iterator *iterator, +struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream( + struct bt_self_notification_iterator *iterator, struct lttng_live_stream_iterator *stream_iter) { - enum bt_ctf_lttng_live_iterator_status status; - struct bt_notification_iterator_next_return next_return; + enum bt_lttng_live_iterator_status status; + struct bt_notification_iterator_next_method_return next_return; struct lttng_live_component *lttng_live; lttng_live = stream_iter->trace->session->lttng_live; @@ -724,18 +768,18 @@ retry: print_stream_state(stream_iter); next_return.notification = NULL; status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } status = lttng_live_iterator_next_handle_one_no_data_stream( lttng_live, stream_iter); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } status = lttng_live_iterator_next_handle_one_quiescent_stream( lttng_live, stream_iter, &next_return.notification); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { - assert(next_return.notification == NULL); + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_ASSERT(next_return.notification == NULL); goto end; } if (next_return.notification) { @@ -743,37 +787,37 @@ retry: } status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live, stream_iter, &next_return.notification); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { - assert(next_return.notification == NULL); + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_ASSERT(next_return.notification == NULL); } end: switch (status) { - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: print_dbg("continue"); goto retry; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; print_dbg("again"); break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: + case BT_LTTNG_LIVE_ITERATOR_STATUS_END: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; print_dbg("end"); break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: + case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; print_dbg("ok"); break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL: + case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR: + case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: default: /* fall-through */ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; break; @@ -782,49 +826,47 @@ end: } static -struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream( - struct bt_private_notification_iterator *iterator, +struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream( + struct bt_self_notification_iterator *iterator, struct lttng_live_no_stream_iterator *no_stream_iter) { - enum bt_ctf_lttng_live_iterator_status status; - struct bt_notification_iterator_next_return next_return; + enum bt_lttng_live_iterator_status status; + struct bt_notification_iterator_next_method_return next_return; struct lttng_live_component *lttng_live; lttng_live = no_stream_iter->lttng_live; retry: lttng_live_force_new_streams_and_metadata(lttng_live); + next_return.notification = NULL; status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); - if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } if (no_stream_iter->port) { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; } end: switch (status) { - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: + case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: goto retry; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: + case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: + case BT_LTTNG_LIVE_ITERATOR_STATUS_END: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL: + case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: + case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: + case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR: + case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: default: /* fall-through */ next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; break; @@ -833,12 +875,12 @@ end: } BT_HIDDEN -struct bt_notification_iterator_next_return lttng_live_iterator_next( - struct bt_private_notification_iterator *iterator) +struct bt_notification_iterator_next_method_return lttng_live_iterator_next( + struct bt_self_notification_iterator *iterator) { struct lttng_live_stream_iterator_generic *s = - bt_private_notification_iterator_get_user_data(iterator); - struct bt_notification_iterator_next_return next_return; + bt_self_notification_iterator_get_user_data(iterator); + struct bt_notification_iterator_next_method_return next_return; switch (s->type) { case LIVE_STREAM_TYPE_NO_STREAM: @@ -858,23 +900,23 @@ struct bt_notification_iterator_next_return lttng_live_iterator_next( BT_HIDDEN enum bt_notification_iterator_status lttng_live_iterator_init( - struct bt_private_notification_iterator *it, + struct bt_self_notification_iterator *it, struct bt_private_port *port) { enum bt_notification_iterator_status ret = BT_NOTIFICATION_ITERATOR_STATUS_OK; struct lttng_live_stream_iterator_generic *s; - assert(it); + BT_ASSERT(it); s = bt_private_port_get_user_data(port); - assert(s); + BT_ASSERT(s); switch (s->type) { case LIVE_STREAM_TYPE_NO_STREAM: { struct lttng_live_no_stream_iterator *no_stream_iter = container_of(s, struct lttng_live_no_stream_iterator, p); - ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter); + ret = bt_self_notification_iterator_set_user_data(it, no_stream_iter); if (ret) { goto error; } @@ -884,7 +926,7 @@ enum bt_notification_iterator_status lttng_live_iterator_init( { struct lttng_live_stream_iterator *stream_iter = container_of(s, struct lttng_live_stream_iterator, p); - ret = bt_private_notification_iterator_set_user_data(it, stream_iter); + ret = bt_self_notification_iterator_set_user_data(it, stream_iter); if (ret) { goto error; } @@ -898,7 +940,7 @@ enum bt_notification_iterator_status lttng_live_iterator_init( end: return ret; error: - if (bt_private_notification_iterator_set_user_data(it, NULL) + if (bt_self_notification_iterator_set_user_data(it, NULL) != BT_NOTIFICATION_ITERATOR_STATUS_OK) { BT_LOGE("Error setting private data to NULL"); } @@ -906,22 +948,30 @@ error: } static -struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class, +struct bt_component_class_query_method_return lttng_live_query_list_sessions( + struct bt_component_class *comp_class, + struct bt_query_executor *query_exec, struct bt_value *params) { + struct bt_component_class_query_method_return query_ret = { + .result = NULL, + .status = BT_QUERY_STATUS_OK, + }; + struct bt_value *url_value = NULL; - struct bt_value *results = NULL; const char *url; struct bt_live_viewer_connection *viewer_connection = NULL; url_value = bt_value_map_get(params, "url"); if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) { BT_LOGW("Mandatory \"url\" parameter missing"); + query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) { BT_LOGW("\"url\" parameter is required to be a string value"); + query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } @@ -930,28 +980,47 @@ struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_ goto error; } - results = bt_live_viewer_connection_list_sessions(viewer_connection); + query_ret.result = + bt_live_viewer_connection_list_sessions(viewer_connection); + if (!query_ret.result) { + goto error; + } + goto end; + error: - BT_PUT(results); + BT_OBJECT_PUT_REF_AND_RESET(query_ret.result); + + if (query_ret.status >= 0) { + query_ret.status = BT_QUERY_STATUS_ERROR; + } + end: if (viewer_connection) { bt_live_viewer_connection_destroy(viewer_connection); } - BT_PUT(url_value); - return results; + BT_OBJECT_PUT_REF_AND_RESET(url_value); + return query_ret; } BT_HIDDEN -struct bt_value *lttng_live_query(struct bt_component_class *comp_class, +struct bt_component_class_query_method_return lttng_live_query( + struct bt_component_class *comp_class, + struct bt_query_executor *query_exec, const char *object, struct bt_value *params) { + struct bt_component_class_query_method_return ret = { + .result = NULL, + .status = BT_QUERY_STATUS_OK, + }; + if (strcmp(object, "sessions") == 0) { return lttng_live_query_list_sessions(comp_class, - params); + query_exec, params); } BT_LOGW("Unknown query object `%s`", object); - return NULL; + ret.status = BT_QUERY_STATUS_INVALID_OBJECT; + return ret; } static @@ -963,14 +1032,15 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { lttng_live_destroy_session(session); } - BT_PUT(lttng_live->viewer_connection); + BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection); if (lttng_live->url) { g_string_free(lttng_live->url, TRUE); } if (lttng_live->no_stream_port) { + bt_object_get_ref(lttng_live->no_stream_port); ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); - assert(!ret); - BT_PUT(lttng_live->no_stream_port); + bt_object_put_ref(lttng_live->no_stream_port); + BT_ASSERT(!ret); } if (lttng_live->no_stream_iter) { g_free(lttng_live->no_stream_iter); @@ -979,9 +1049,9 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) } BT_HIDDEN -void lttng_live_component_finalize(struct bt_private_component *component) +void lttng_live_component_finalize(struct bt_self_component *component) { - void *data = bt_private_component_get_user_data(component); + void *data = bt_self_component_get_user_data(component); if (!data) { return; @@ -991,8 +1061,7 @@ void lttng_live_component_finalize(struct bt_private_component *component) static struct lttng_live_component *lttng_live_component_create(struct bt_value *params, - struct bt_private_component *private_component, - struct bt_graph *graph) + struct bt_self_component *private_component) { struct lttng_live_component *lttng_live; struct bt_value *value = NULL; @@ -1011,15 +1080,12 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params BT_LOGW("Mandatory \"url\" parameter missing"); goto error; } - ret = bt_value_string_get(value, &url); - if (ret != BT_VALUE_STATUS_OK) { - BT_LOGW("\"url\" parameter is required to be a string value"); - goto error; - } + url = bt_value_string_get(value); lttng_live->url = g_string_new(url); if (!lttng_live->url) { goto error; } + BT_OBJECT_PUT_REF_AND_RESET(value); lttng_live->viewer_connection = bt_live_viewer_connection_create(lttng_live->url->str, lttng_live); if (!lttng_live->viewer_connection) { @@ -1029,7 +1095,6 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params goto error; } lttng_live->private_component = private_component; - lttng_live->graph = graph; goto end; @@ -1042,45 +1107,38 @@ end: BT_HIDDEN enum bt_component_status lttng_live_component_init( - struct bt_private_component *private_component, + struct bt_self_component *private_component, struct bt_value *params, void *init_method_data) { struct lttng_live_component *lttng_live; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - struct bt_component *component; - struct bt_graph *graph; - - component = bt_component_from_private_component(private_component); - graph = bt_component_get_graph(component); - bt_put(graph); /* weak */ - bt_put(component); /* Passes ownership of iter ref to lttng_live_component_create. */ - lttng_live = lttng_live_component_create(params, private_component, - graph); + lttng_live = lttng_live_component_create(params, private_component); if (!lttng_live) { - if (bt_graph_is_canceled(graph)) { - ret = BT_COMPONENT_STATUS_AGAIN; - } else { - ret = BT_COMPONENT_STATUS_NOMEM; - } + //TODO : we need access to the application cancel state + //because we are not part of a graph yet. + ret = BT_COMPONENT_STATUS_NOMEM; goto end; } lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1); lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM; lttng_live->no_stream_iter->lttng_live = lttng_live; - ret = bt_private_component_source_add_output_private_port( + if (lttng_live_is_canceled(lttng_live)) { + goto end; + } + ret = bt_self_component_source_add_output_port( lttng_live->private_component, "no-stream", lttng_live->no_stream_iter, <tng_live->no_stream_port); if (ret != BT_COMPONENT_STATUS_OK) { goto end; } - + bt_object_put_ref(lttng_live->no_stream_port); /* weak */ lttng_live->no_stream_iter->port = lttng_live->no_stream_port; - ret = bt_private_component_set_user_data(private_component, lttng_live); + ret = bt_self_component_set_user_data(private_component, lttng_live); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } @@ -1088,25 +1146,25 @@ enum bt_component_status lttng_live_component_init( end: return ret; error: - (void) bt_private_component_set_user_data(private_component, NULL); + (void) bt_self_component_set_user_data(private_component, NULL); lttng_live_component_destroy_data(lttng_live); return ret; } BT_HIDDEN enum bt_component_status lttng_live_accept_port_connection( - struct bt_private_component *private_component, + struct bt_self_component *private_component, struct bt_private_port *self_private_port, struct bt_port *other_port) { struct lttng_live_component *lttng_live = - bt_private_component_get_user_data(private_component); + bt_self_component_get_user_data(private_component); struct bt_component *other_component; enum bt_component_status status = BT_COMPONENT_STATUS_OK; - struct bt_port *self_port = bt_port_from_private_port(self_private_port); + struct bt_port *self_port = bt_port_from_private(self_private_port); other_component = bt_port_get_component(other_port); - bt_put(other_component); /* weak */ + bt_object_put_ref(other_component); /* weak */ if (!lttng_live->downstream_component) { lttng_live->downstream_component = other_component; @@ -1126,6 +1184,6 @@ enum bt_component_status lttng_live_accept_port_connection( goto end; } end: - bt_put(self_port); + bt_object_put_ref(self_port); return status; }