X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Flttng-live.c;h=b40934b7afe64c70c91b8dbd83ae28aeb2ecd68d;hb=9d408fcae74602e3591f66623ceb85f482d948ed;hp=0e64f7d969ab89405cded2a0be1a0cea8261f23e;hpb=4c66436f61db5fcb293f79313f78b6affc83666a;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/lttng-live.c b/plugins/ctf/lttng-live/lttng-live.c index 0e64f7d9..b40934b7 100644 --- a/plugins/ctf/lttng-live/lttng-live.c +++ b/plugins/ctf/lttng-live/lttng-live.c @@ -27,30 +27,18 @@ * SOFTWARE. */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC" +#include "logging.h" + +#include #include +#include #include #include #include #include #include -#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE" -#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL" - #include "data-stream.h" #include "metadata.h" #include "lttng-live-internal.h" @@ -77,14 +65,38 @@ static const char *print_state(struct lttng_live_stream_iterator *s) } } -#define print_stream_state(stream) \ - print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \ - bt_port_get_name(bt_port_from_private_port(stream->port)), \ - print_state(stream), stream->last_returned_inactivity_timestamp, \ - stream->current_inactivity_timestamp) +static +void print_stream_state(struct lttng_live_stream_iterator *stream) +{ + struct bt_port *port; + + port = bt_port_from_private_port(stream->port); + print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, + bt_port_get_name(port), + print_state(stream), + stream->last_returned_inactivity_timestamp, + stream->current_inactivity_timestamp); + bt_put(port); +} BT_HIDDEN -int bt_lttng_live_log_level = BT_LOG_NONE; +bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live) +{ + struct bt_component *component; + struct bt_graph *graph; + bt_bool ret; + + if (!lttng_live) { + return BT_FALSE; + } + + component = bt_component_from_private_component(lttng_live->private_component); + graph = bt_component_get_graph(component); + ret = bt_graph_is_canceled(graph); + bt_put(graph); + bt_put(component); + return ret; +} BT_HIDDEN int lttng_live_add_port(struct lttng_live_component *lttng_live, @@ -93,23 +105,36 @@ int lttng_live_add_port(struct lttng_live_component *lttng_live, int ret; struct bt_private_port *private_port; char name[STREAM_NAME_MAX_LEN]; + enum bt_component_status status; ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id); assert(ret > 0); strcpy(stream_iter->name, name); - private_port = bt_private_component_source_add_output_private_port( - lttng_live->private_component, name, stream_iter); - if (!private_port) { + if (lttng_live_is_canceled(lttng_live)) { + return 0; + } + status = bt_private_component_source_add_output_private_port( + lttng_live->private_component, name, stream_iter, + &private_port); + switch (status) { + case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: + return 0; + case BT_COMPONENT_STATUS_OK: + break; + default: return -1; } + bt_put(private_port); /* weak */ BT_LOGI("Added port %s", name); if (lttng_live->no_stream_port) { + bt_get(lttng_live->no_stream_port); ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); + bt_put(lttng_live->no_stream_port); if (ret) { return -1; } - BT_PUT(lttng_live->no_stream_port); + lttng_live->no_stream_port = NULL; lttng_live->no_stream_iter->port = NULL; } stream_iter->port = private_port; @@ -131,16 +156,30 @@ int lttng_live_remove_port(struct lttng_live_component *lttng_live, } BT_PUT(component); if (nr_ports == 1) { + enum bt_component_status status; + assert(!lttng_live->no_stream_port); - lttng_live->no_stream_port = - bt_private_component_source_add_output_private_port(lttng_live->private_component, - "no-stream", lttng_live->no_stream_iter); - if (!lttng_live->no_stream_port) { + + if (lttng_live_is_canceled(lttng_live)) { + return 0; + } + status = bt_private_component_source_add_output_private_port(lttng_live->private_component, + "no-stream", lttng_live->no_stream_iter, + <tng_live->no_stream_port); + switch (status) { + case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: + return 0; + case BT_COMPONENT_STATUS_OK: + break; + default: return -1; } + bt_put(lttng_live->no_stream_port); /* weak */ lttng_live->no_stream_iter->port = lttng_live->no_stream_port; } + bt_get(port); ret = bt_private_port_remove_from_component(port); + bt_put(port); if (ret) { return -1; } @@ -165,15 +204,18 @@ static void lttng_live_destroy_trace(struct bt_object *obj) { struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj); - int retval; BT_LOGI("Destroy trace"); assert(bt_list_empty(&trace->streams)); bt_list_del(&trace->node); - retval = bt_ctf_trace_set_is_static(trace->trace); - assert(!retval); + if (trace->trace) { + int retval; + retval = bt_ctf_trace_set_is_static(trace->trace); + assert(!retval); + BT_PUT(trace->trace); + } lttng_live_metadata_fini(trace); BT_PUT(trace->cc_prio_map); g_free(trace); @@ -236,7 +278,9 @@ void lttng_live_close_trace_streams(struct lttng_live_trace *trace) } BT_HIDDEN -int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id) +int lttng_live_add_session(struct lttng_live_component *lttng_live, + uint64_t session_id, const char *hostname, + const char *session_name) { int ret = 0; struct lttng_live_session *s; @@ -250,8 +294,11 @@ int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t ses BT_INIT_LIST_HEAD(&s->traces); s->lttng_live = lttng_live; s->new_streams_needed = true; + s->hostname = g_string_new(hostname); + s->session_name = g_string_new(session_name); - BT_LOGI("Reading from session %" PRIu64, s->id); + BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", + s->id, hostname, session_name); bt_list_add(&s->node, <tng_live->sessions); goto end; error: @@ -270,7 +317,7 @@ void lttng_live_destroy_session(struct lttng_live_session *session) BT_LOGI("Destroy session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - if (!bt_graph_is_canceled(session->lttng_live->graph)) { + if (!lttng_live_is_canceled(session->lttng_live)) { /* Old relayd cannot detach sessions. */ BT_LOGD("Unable to detach session %" PRIu64, session->id); @@ -282,14 +329,20 @@ void lttng_live_destroy_session(struct lttng_live_session *session) lttng_live_close_trace_streams(trace); } bt_list_del(&session->node); + if (session->hostname) { + g_string_free(session->hostname, TRUE); + } + if (session->session_name) { + g_string_free(session->session_name, TRUE); + } g_free(session); } BT_HIDDEN -void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it) +void lttng_live_iterator_finalize(struct bt_private_connection_private_notification_iterator *it) { struct lttng_live_stream_iterator_generic *s = - bt_private_notification_iterator_get_user_data(it); + bt_private_connection_private_notification_iterator_get_user_data(it); switch (s->type) { case LIVE_STREAM_TYPE_NO_STREAM: @@ -404,7 +457,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_session( struct lttng_live_trace *trace, *t; if (lttng_live_attach_session(session)) { - if (bt_graph_is_canceled(lttng_live->graph)) { + if (lttng_live_is_canceled(lttng_live)) { return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -451,7 +504,7 @@ void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttn } static -enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( +enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( struct lttng_live_component *lttng_live) { enum bt_ctf_lttng_live_iterator_status ret = @@ -702,12 +755,12 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_activ * When disconnected from relayd: try to re-connect endlessly. */ static -struct bt_notification_iterator_next_return lttng_live_iterator_next_stream( - struct bt_private_notification_iterator *iterator, +struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream( + struct bt_private_connection_private_notification_iterator *iterator, struct lttng_live_stream_iterator *stream_iter) { enum bt_ctf_lttng_live_iterator_status status; - struct bt_notification_iterator_next_return next_return; + struct bt_notification_iterator_next_method_return next_return; struct lttng_live_component *lttng_live; lttng_live = stream_iter->trace->session->lttng_live; @@ -773,17 +826,18 @@ end: } static -struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream( - struct bt_private_notification_iterator *iterator, +struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream( + struct bt_private_connection_private_notification_iterator *iterator, struct lttng_live_no_stream_iterator *no_stream_iter) { enum bt_ctf_lttng_live_iterator_status status; - struct bt_notification_iterator_next_return next_return; + struct bt_notification_iterator_next_method_return next_return; struct lttng_live_component *lttng_live; lttng_live = no_stream_iter->lttng_live; retry: lttng_live_force_new_streams_and_metadata(lttng_live); + next_return.notification = NULL; status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; @@ -803,9 +857,6 @@ end: case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; break; - case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK: - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - break; case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL: next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; break; @@ -824,12 +875,12 @@ end: } BT_HIDDEN -struct bt_notification_iterator_next_return lttng_live_iterator_next( - struct bt_private_notification_iterator *iterator) +struct bt_notification_iterator_next_method_return lttng_live_iterator_next( + struct bt_private_connection_private_notification_iterator *iterator) { struct lttng_live_stream_iterator_generic *s = - bt_private_notification_iterator_get_user_data(iterator); - struct bt_notification_iterator_next_return next_return; + bt_private_connection_private_notification_iterator_get_user_data(iterator); + struct bt_notification_iterator_next_method_return next_return; switch (s->type) { case LIVE_STREAM_TYPE_NO_STREAM: @@ -849,7 +900,7 @@ struct bt_notification_iterator_next_return lttng_live_iterator_next( BT_HIDDEN enum bt_notification_iterator_status lttng_live_iterator_init( - struct bt_private_notification_iterator *it, + struct bt_private_connection_private_notification_iterator *it, struct bt_private_port *port) { enum bt_notification_iterator_status ret = @@ -865,7 +916,7 @@ enum bt_notification_iterator_status lttng_live_iterator_init( { struct lttng_live_no_stream_iterator *no_stream_iter = container_of(s, struct lttng_live_no_stream_iterator, p); - ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter); + ret = bt_private_connection_private_notification_iterator_set_user_data(it, no_stream_iter); if (ret) { goto error; } @@ -875,7 +926,7 @@ enum bt_notification_iterator_status lttng_live_iterator_init( { struct lttng_live_stream_iterator *stream_iter = container_of(s, struct lttng_live_stream_iterator, p); - ret = bt_private_notification_iterator_set_user_data(it, stream_iter); + ret = bt_private_connection_private_notification_iterator_set_user_data(it, stream_iter); if (ret) { goto error; } @@ -889,7 +940,7 @@ enum bt_notification_iterator_status lttng_live_iterator_init( end: return ret; error: - if (bt_private_notification_iterator_set_user_data(it, NULL) + if (bt_private_connection_private_notification_iterator_set_user_data(it, NULL) != BT_NOTIFICATION_ITERATOR_STATUS_OK) { BT_LOGE("Error setting private data to NULL"); } @@ -897,55 +948,79 @@ error: } static -struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class, +struct bt_component_class_query_method_return lttng_live_query_list_sessions( + struct bt_component_class *comp_class, + struct bt_query_executor *query_exec, struct bt_value *params) { + struct bt_component_class_query_method_return query_ret = { + .result = NULL, + .status = BT_QUERY_STATUS_OK, + }; + struct bt_value *url_value = NULL; - struct bt_value *results = NULL; const char *url; struct bt_live_viewer_connection *viewer_connection = NULL; - enum bt_value_status ret; url_value = bt_value_map_get(params, "url"); if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) { BT_LOGW("Mandatory \"url\" parameter missing"); + query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } - ret = bt_value_string_get(url_value, &url); - if (ret != BT_VALUE_STATUS_OK) { + if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) { BT_LOGW("\"url\" parameter is required to be a string value"); + query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; goto error; } viewer_connection = bt_live_viewer_connection_create(url, NULL); if (!viewer_connection) { - ret = BT_COMPONENT_STATUS_NOMEM; goto error; } - results = bt_live_viewer_connection_list_sessions(viewer_connection); + query_ret.result = + bt_live_viewer_connection_list_sessions(viewer_connection); + if (!query_ret.result) { + goto error; + } + goto end; + error: - BT_PUT(results); + BT_PUT(query_ret.result); + + if (query_ret.status >= 0) { + query_ret.status = BT_QUERY_STATUS_ERROR; + } + end: if (viewer_connection) { bt_live_viewer_connection_destroy(viewer_connection); } BT_PUT(url_value); - return results; + return query_ret; } BT_HIDDEN -struct bt_value *lttng_live_query(struct bt_component_class *comp_class, +struct bt_component_class_query_method_return lttng_live_query( + struct bt_component_class *comp_class, + struct bt_query_executor *query_exec, const char *object, struct bt_value *params) { + struct bt_component_class_query_method_return ret = { + .result = NULL, + .status = BT_QUERY_STATUS_OK, + }; + if (strcmp(object, "sessions") == 0) { return lttng_live_query_list_sessions(comp_class, - params); + query_exec, params); } BT_LOGW("Unknown query object `%s`", object); - return NULL; + ret.status = BT_QUERY_STATUS_INVALID_OBJECT; + return ret; } static @@ -962,9 +1037,10 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) g_string_free(lttng_live->url, TRUE); } if (lttng_live->no_stream_port) { + bt_get(lttng_live->no_stream_port); ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); + bt_put(lttng_live->no_stream_port); assert(!ret); - BT_PUT(lttng_live->no_stream_port); } if (lttng_live->no_stream_iter) { g_free(lttng_live->no_stream_iter); @@ -991,7 +1067,6 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params struct bt_value *value = NULL; const char *url; enum bt_value_status ret; - struct bt_component *component; lttng_live = g_new0(struct lttng_live_component, 1); if (!lttng_live) { @@ -1014,31 +1089,17 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params if (!lttng_live->url) { goto error; } + BT_PUT(value); lttng_live->viewer_connection = bt_live_viewer_connection_create(lttng_live->url->str, lttng_live); if (!lttng_live->viewer_connection) { - if (bt_graph_is_canceled(lttng_live->graph)) { - ret = BT_COMPONENT_STATUS_AGAIN; - } else { - ret = BT_COMPONENT_STATUS_NOMEM; - } goto error; } if (lttng_live_create_viewer_session(lttng_live)) { - if (bt_graph_is_canceled(lttng_live->graph)) { - ret = BT_COMPONENT_STATUS_AGAIN; - } else { - ret = BT_COMPONENT_STATUS_NOMEM; - } goto error; } lttng_live->private_component = private_component; - component = bt_component_from_private_component(private_component); - lttng_live->graph = bt_component_get_graph(component); - bt_put(lttng_live->graph); /* weak */ - bt_put(component); - goto end; error: @@ -1049,15 +1110,18 @@ end: } BT_HIDDEN -enum bt_component_status lttng_live_component_init(struct bt_private_component *component, +enum bt_component_status lttng_live_component_init( + struct bt_private_component *private_component, struct bt_value *params, void *init_method_data) { struct lttng_live_component *lttng_live; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; /* Passes ownership of iter ref to lttng_live_component_create. */ - lttng_live = lttng_live_component_create(params, component); + lttng_live = lttng_live_component_create(params, private_component); if (!lttng_live) { + //TODO : we need access to the application cancel state + //because we are not part of a graph yet. ret = BT_COMPONENT_STATUS_NOMEM; goto end; } @@ -1065,14 +1129,20 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component * lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1); lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM; lttng_live->no_stream_iter->lttng_live = lttng_live; - - lttng_live->no_stream_port = - bt_private_component_source_add_output_private_port( + if (lttng_live_is_canceled(lttng_live)) { + goto end; + } + ret = bt_private_component_source_add_output_private_port( lttng_live->private_component, "no-stream", - lttng_live->no_stream_iter); + lttng_live->no_stream_iter, + <tng_live->no_stream_port); + if (ret != BT_COMPONENT_STATUS_OK) { + goto end; + } + bt_put(lttng_live->no_stream_port); /* weak */ lttng_live->no_stream_iter->port = lttng_live->no_stream_port; - ret = bt_private_component_set_user_data(component, lttng_live); + ret = bt_private_component_set_user_data(private_component, lttng_live); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } @@ -1080,7 +1150,7 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component * end: return ret; error: - (void) bt_private_component_set_user_data(component, NULL); + (void) bt_private_component_set_user_data(private_component, NULL); lttng_live_component_destroy_data(lttng_live); return ret; } @@ -1121,35 +1191,3 @@ end: bt_put(self_port); return status; } - -static -void __attribute__((constructor)) bt_lttng_live_logging_ctor(void) -{ - enum bt_logging_level log_level = BT_LOG_NONE; - const char *log_level_env = getenv(BT_LOGLEVEL_NAME); - - if (!log_level_env) { - return; - } - - if (strcmp(log_level_env, "VERBOSE") == 0) { - log_level = BT_LOGGING_LEVEL_VERBOSE; - } else if (strcmp(log_level_env, "DEBUG") == 0) { - log_level = BT_LOGGING_LEVEL_DEBUG; - } else if (strcmp(log_level_env, "INFO") == 0) { - log_level = BT_LOGGING_LEVEL_INFO; - } else if (strcmp(log_level_env, "WARN") == 0) { - log_level = BT_LOGGING_LEVEL_WARN; - } else if (strcmp(log_level_env, "ERROR") == 0) { - log_level = BT_LOGGING_LEVEL_ERROR; - } else if (strcmp(log_level_env, "FATAL") == 0) { - log_level = BT_LOGGING_LEVEL_FATAL; - } else { - bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL; - BT_LOGF("Incorrect log level specified in %s", - BT_LOGLEVEL_NAME); - abort(); - } - - bt_lttng_live_log_level = log_level; -}