Add query executor
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
index 0e64f7d969ab89405cded2a0be1a0cea8261f23e..c3e7b641c4a52311644e5d09738d91226607c9d4 100644 (file)
@@ -27,6 +27,9 @@
  * SOFTWARE.
  */
 
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
 #include <babeltrace/ctf-ir/packet.h>
 #include <babeltrace/graph/component-source.h>
 #include <babeltrace/graph/private-port.h>
 #include <babeltrace/graph/notification-inactivity.h>
 #include <babeltrace/graph/graph.h>
 #include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
 #include <inttypes.h>
 #include <glib.h>
 #include <assert.h>
 #include <unistd.h>
 #include <plugins-common.h>
 
-#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
-#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
-
 #include "data-stream.h"
 #include "metadata.h"
 #include "lttng-live-internal.h"
@@ -77,14 +78,38 @@ static const char *print_state(struct lttng_live_stream_iterator *s)
        }
 }
 
-#define print_stream_state(stream)     \
-       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
-                       bt_port_get_name(bt_port_from_private_port(stream->port)),      \
-                       print_state(stream), stream->last_returned_inactivity_timestamp,        \
-                       stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+       struct bt_port *port;
+
+       port = bt_port_from_private_port(stream->port);
+       print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+               bt_port_get_name(port),
+               print_state(stream),
+               stream->last_returned_inactivity_timestamp,
+               stream->current_inactivity_timestamp);
+       bt_put(port);
+}
 
 BT_HIDDEN
-int bt_lttng_live_log_level = BT_LOG_NONE;
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+       struct bt_component *component;
+       struct bt_graph *graph;
+       bt_bool ret;
+
+       if (!lttng_live) {
+               return BT_FALSE;
+       }
+
+       component = bt_component_from_private_component(lttng_live->private_component);
+       graph = bt_component_get_graph(component);
+       ret = bt_graph_is_canceled(graph);
+       bt_put(graph);
+       bt_put(component);
+       return ret;
+}
 
 BT_HIDDEN
 int lttng_live_add_port(struct lttng_live_component *lttng_live,
@@ -93,23 +118,36 @@ int lttng_live_add_port(struct lttng_live_component *lttng_live,
        int ret;
        struct bt_private_port *private_port;
        char name[STREAM_NAME_MAX_LEN];
+       enum bt_component_status status;
 
        ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
        assert(ret > 0);
        strcpy(stream_iter->name, name);
-       private_port = bt_private_component_source_add_output_private_port(
-                       lttng_live->private_component, name, stream_iter);
-       if (!private_port) {
+       if (lttng_live_is_canceled(lttng_live)) {
+               return 0;
+       }
+       status = bt_private_component_source_add_output_private_port(
+                       lttng_live->private_component, name, stream_iter,
+                       &private_port);
+       switch (status) {
+       case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+               return 0;
+       case BT_COMPONENT_STATUS_OK:
+               break;
+       default:
                return -1;
        }
+       bt_put(private_port);   /* weak */
        BT_LOGI("Added port %s", name);
 
        if (lttng_live->no_stream_port) {
+               bt_get(lttng_live->no_stream_port);
                ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+               bt_put(lttng_live->no_stream_port);
                if (ret) {
                        return -1;
                }
-               BT_PUT(lttng_live->no_stream_port);
+               lttng_live->no_stream_port = NULL;
                lttng_live->no_stream_iter->port = NULL;
        }
        stream_iter->port = private_port;
@@ -131,16 +169,30 @@ int lttng_live_remove_port(struct lttng_live_component *lttng_live,
        }
        BT_PUT(component);
        if (nr_ports == 1) {
+               enum bt_component_status status;
+
                assert(!lttng_live->no_stream_port);
-               lttng_live->no_stream_port =
-                       bt_private_component_source_add_output_private_port(lttng_live->private_component,
-                               "no-stream", lttng_live->no_stream_iter);
-               if (!lttng_live->no_stream_port) {
+
+               if (lttng_live_is_canceled(lttng_live)) {
+                       return 0;
+               }
+               status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
+                               "no-stream", lttng_live->no_stream_iter,
+                               &lttng_live->no_stream_port);
+               switch (status) {
+               case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+                       return 0;
+               case BT_COMPONENT_STATUS_OK:
+                       break;
+               default:
                        return -1;
                }
+               bt_put(lttng_live->no_stream_port);     /* weak */
                lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
        }
+       bt_get(port);
        ret = bt_private_port_remove_from_component(port);
+       bt_put(port);
        if (ret) {
                return -1;
        }
@@ -165,15 +217,18 @@ static
 void lttng_live_destroy_trace(struct bt_object *obj)
 {
        struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
-       int retval;
 
        BT_LOGI("Destroy trace");
        assert(bt_list_empty(&trace->streams));
        bt_list_del(&trace->node);
 
-       retval = bt_ctf_trace_set_is_static(trace->trace);
-       assert(!retval);
+       if (trace->trace) {
+               int retval;
 
+               retval = bt_ctf_trace_set_is_static(trace->trace);
+               assert(!retval);
+               BT_PUT(trace->trace);
+       }
        lttng_live_metadata_fini(trace);
        BT_PUT(trace->cc_prio_map);
        g_free(trace);
@@ -236,7 +291,9 @@ void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
 }
 
 BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+               uint64_t session_id, const char *hostname,
+               const char *session_name)
 {
        int ret = 0;
        struct lttng_live_session *s;
@@ -250,8 +307,11 @@ int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t ses
        BT_INIT_LIST_HEAD(&s->traces);
        s->lttng_live = lttng_live;
        s->new_streams_needed = true;
+       s->hostname = g_string_new(hostname);
+       s->session_name = g_string_new(session_name);
 
-       BT_LOGI("Reading from session %" PRIu64, s->id);
+       BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+               s->id, hostname, session_name);
        bt_list_add(&s->node, &lttng_live->sessions);
        goto end;
 error:
@@ -270,7 +330,7 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
        BT_LOGI("Destroy session");
        if (session->id != -1ULL) {
                if (lttng_live_detach_session(session)) {
-                       if (!bt_graph_is_canceled(session->lttng_live->graph)) {
+                       if (!lttng_live_is_canceled(session->lttng_live)) {
                                /* Old relayd cannot detach sessions. */
                                BT_LOGD("Unable to detach session %" PRIu64,
                                        session->id);
@@ -282,6 +342,12 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
                lttng_live_close_trace_streams(trace);
        }
        bt_list_del(&session->node);
+       if (session->hostname) {
+               g_string_free(session->hostname, TRUE);
+       }
+       if (session->session_name) {
+               g_string_free(session->session_name, TRUE);
+       }
        g_free(session);
 }
 
@@ -404,7 +470,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
        struct lttng_live_trace *trace, *t;
 
        if (lttng_live_attach_session(session)) {
-               if (bt_graph_is_canceled(lttng_live->graph)) {
+               if (lttng_live_is_canceled(lttng_live)) {
                        return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                } else {
                        return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
@@ -451,7 +517,7 @@ void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttn
 }
 
 static
-enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
                struct lttng_live_component *lttng_live)
 {
        enum bt_ctf_lttng_live_iterator_status ret =
@@ -784,6 +850,7 @@ struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
        lttng_live = no_stream_iter->lttng_live;
 retry:
        lttng_live_force_new_streams_and_metadata(lttng_live);
+       next_return.notification = NULL;
        status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
        if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
                goto end;
@@ -803,9 +870,6 @@ end:
        case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
                next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                break;
-       case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-               break;
        case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
                next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
                break;
@@ -897,55 +961,79 @@ error:
 }
 
 static
-struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
+struct bt_component_class_query_return lttng_live_query_list_sessions(
+               struct bt_component_class *comp_class,
+               struct bt_query_executor *query_exec,
                struct bt_value *params)
 {
+       struct bt_component_class_query_return query_ret = {
+               .result = NULL,
+               .status = BT_QUERY_STATUS_OK,
+       };
+
        struct bt_value *url_value = NULL;
-       struct bt_value *results = NULL;
        const char *url;
        struct bt_live_viewer_connection *viewer_connection = NULL;
-       enum bt_value_status ret;
 
        url_value = bt_value_map_get(params, "url");
        if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
                BT_LOGW("Mandatory \"url\" parameter missing");
+               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
-       ret = bt_value_string_get(url_value, &url);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
                BT_LOGW("\"url\" parameter is required to be a string value");
+               query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
                goto error;
        }
 
        viewer_connection = bt_live_viewer_connection_create(url, NULL);
        if (!viewer_connection) {
-               ret = BT_COMPONENT_STATUS_NOMEM;
                goto error;
        }
 
-       results = bt_live_viewer_connection_list_sessions(viewer_connection);
+       query_ret.result =
+               bt_live_viewer_connection_list_sessions(viewer_connection);
+       if (!query_ret.result) {
+               goto error;
+       }
+
        goto end;
+
 error:
-       BT_PUT(results);
+       BT_PUT(query_ret.result);
+
+       if (query_ret.status >= 0) {
+               query_ret.status = BT_QUERY_STATUS_ERROR;
+       }
+
 end:
        if (viewer_connection) {
                bt_live_viewer_connection_destroy(viewer_connection);
        }
        BT_PUT(url_value);
-       return results;
+       return query_ret;
 }
 
 BT_HIDDEN
-struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
+struct bt_component_class_query_return lttng_live_query(
+               struct bt_component_class *comp_class,
+               struct bt_query_executor *query_exec,
                const char *object, struct bt_value *params)
 {
+       struct bt_component_class_query_return ret = {
+               .result = NULL,
+               .status = BT_QUERY_STATUS_OK,
+       };
+
        if (strcmp(object, "sessions") == 0) {
                return lttng_live_query_list_sessions(comp_class,
-                       params);
+                       query_exec, params);
        }
        BT_LOGW("Unknown query object `%s`", object);
-       return NULL;
+       ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
+       return ret;
 }
 
 static
@@ -962,9 +1050,10 @@ void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
                g_string_free(lttng_live->url, TRUE);
        }
        if (lttng_live->no_stream_port) {
+               bt_get(lttng_live->no_stream_port);
                ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+               bt_put(lttng_live->no_stream_port);
                assert(!ret);
-               BT_PUT(lttng_live->no_stream_port);
        }
        if (lttng_live->no_stream_iter) {
                g_free(lttng_live->no_stream_iter);
@@ -991,7 +1080,6 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params
        struct bt_value *value = NULL;
        const char *url;
        enum bt_value_status ret;
-       struct bt_component *component;
 
        lttng_live = g_new0(struct lttng_live_component, 1);
        if (!lttng_live) {
@@ -1014,31 +1102,17 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params
        if (!lttng_live->url) {
                goto error;
        }
+       BT_PUT(value);
        lttng_live->viewer_connection =
                bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
        if (!lttng_live->viewer_connection) {
-               if (bt_graph_is_canceled(lttng_live->graph)) {
-                       ret = BT_COMPONENT_STATUS_AGAIN;
-               } else {
-                       ret = BT_COMPONENT_STATUS_NOMEM;
-               }
                goto error;
        }
        if (lttng_live_create_viewer_session(lttng_live)) {
-               if (bt_graph_is_canceled(lttng_live->graph)) {
-                       ret = BT_COMPONENT_STATUS_AGAIN;
-               } else {
-                       ret = BT_COMPONENT_STATUS_NOMEM;
-               }
                goto error;
        }
        lttng_live->private_component = private_component;
 
-       component = bt_component_from_private_component(private_component);
-       lttng_live->graph = bt_component_get_graph(component);
-       bt_put(lttng_live->graph);      /* weak */
-       bt_put(component);
-
        goto end;
 
 error:
@@ -1049,15 +1123,18 @@ end:
 }
 
 BT_HIDDEN
-enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
+enum bt_component_status lttng_live_component_init(
+               struct bt_private_component *private_component,
                struct bt_value *params, void *init_method_data)
 {
        struct lttng_live_component *lttng_live;
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
 
        /* Passes ownership of iter ref to lttng_live_component_create. */
-       lttng_live = lttng_live_component_create(params, component);
+       lttng_live = lttng_live_component_create(params, private_component);
        if (!lttng_live) {
+               //TODO : we need access to the application cancel state
+               //because we are not part of a graph yet.
                ret = BT_COMPONENT_STATUS_NOMEM;
                goto end;
        }
@@ -1065,14 +1142,20 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component *
        lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
        lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
        lttng_live->no_stream_iter->lttng_live = lttng_live;
-
-       lttng_live->no_stream_port =
-               bt_private_component_source_add_output_private_port(
+       if (lttng_live_is_canceled(lttng_live)) {
+               goto end;
+       }
+       ret = bt_private_component_source_add_output_private_port(
                                lttng_live->private_component, "no-stream",
-                               lttng_live->no_stream_iter);
+                               lttng_live->no_stream_iter,
+                               &lttng_live->no_stream_port);
+       if (ret != BT_COMPONENT_STATUS_OK) {
+               goto end;
+       }
+       bt_put(lttng_live->no_stream_port);     /* weak */
        lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
 
-       ret = bt_private_component_set_user_data(component, lttng_live);
+       ret = bt_private_component_set_user_data(private_component, lttng_live);
        if (ret != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
@@ -1080,7 +1163,7 @@ enum bt_component_status lttng_live_component_init(struct bt_private_component *
 end:
        return ret;
 error:
-       (void) bt_private_component_set_user_data(component, NULL);
+       (void) bt_private_component_set_user_data(private_component, NULL);
        lttng_live_component_destroy_data(lttng_live);
        return ret;
 }
@@ -1121,35 +1204,3 @@ end:
        bt_put(self_port);
        return status;
 }
-
-static
-void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
-{
-       enum bt_logging_level log_level = BT_LOG_NONE;
-       const char *log_level_env = getenv(BT_LOGLEVEL_NAME);
-
-       if (!log_level_env) {
-               return;
-       }
-
-       if (strcmp(log_level_env, "VERBOSE") == 0) {
-               log_level = BT_LOGGING_LEVEL_VERBOSE;
-       } else if (strcmp(log_level_env, "DEBUG") == 0) {
-               log_level = BT_LOGGING_LEVEL_DEBUG;
-       } else if (strcmp(log_level_env, "INFO") == 0) {
-               log_level = BT_LOGGING_LEVEL_INFO;
-       } else if (strcmp(log_level_env, "WARN") == 0) {
-               log_level = BT_LOGGING_LEVEL_WARN;
-       } else if (strcmp(log_level_env, "ERROR") == 0) {
-               log_level = BT_LOGGING_LEVEL_ERROR;
-       } else if (strcmp(log_level_env, "FATAL") == 0) {
-               log_level = BT_LOGGING_LEVEL_FATAL;
-       } else {
-               bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL;
-               BT_LOGF("Incorrect log level specified in %s",
-                               BT_LOGLEVEL_NAME);
-               abort();
-       }
-
-        bt_lttng_live_log_level = log_level;
-}
This page took 0.028163 seconds and 4 git commands to generate.