* SOFTWARE.
*/
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
#include <babeltrace/ctf-ir/packet.h>
#include <babeltrace/graph/component-source.h>
#include <babeltrace/graph/private-port.h>
#include <babeltrace/graph/notification-inactivity.h>
#include <babeltrace/graph/graph.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
#include <inttypes.h>
#include <glib.h>
#include <assert.h>
#include <unistd.h>
#include <plugins-common.h>
-#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
-#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
-
#include "data-stream.h"
#include "metadata.h"
#include "lttng-live-internal.h"
}
}
-#define print_stream_state(stream) \
- print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
- bt_port_get_name(bt_port_from_private_port(stream->port)), \
- print_state(stream), stream->last_returned_inactivity_timestamp, \
- stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+ struct bt_port *port;
+
+ port = bt_port_from_private_port(stream->port);
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+ bt_port_get_name(port),
+ print_state(stream),
+ stream->last_returned_inactivity_timestamp,
+ stream->current_inactivity_timestamp);
+ bt_put(port);
+}
BT_HIDDEN
-int bt_lttng_live_log_level = BT_LOG_NONE;
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+ struct bt_component *component;
+ struct bt_graph *graph;
+ bt_bool ret;
+
+ if (!lttng_live) {
+ return BT_FALSE;
+ }
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ graph = bt_component_get_graph(component);
+ ret = bt_graph_is_canceled(graph);
+ bt_put(graph);
+ bt_put(component);
+ return ret;
+}
BT_HIDDEN
int lttng_live_add_port(struct lttng_live_component *lttng_live,
int ret;
struct bt_private_port *private_port;
char name[STREAM_NAME_MAX_LEN];
+ enum bt_component_status status;
ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
assert(ret > 0);
strcpy(stream_iter->name, name);
- private_port = bt_private_component_source_add_output_private_port(
- lttng_live->private_component, name, stream_iter);
- if (!private_port) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, name, stream_iter,
+ &private_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_put(private_port); /* weak */
BT_LOGI("Added port %s", name);
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
if (ret) {
return -1;
}
- BT_PUT(lttng_live->no_stream_port);
+ lttng_live->no_stream_port = NULL;
lttng_live->no_stream_iter->port = NULL;
}
stream_iter->port = private_port;
}
BT_PUT(component);
if (nr_ports == 1) {
+ enum bt_component_status status;
+
assert(!lttng_live->no_stream_port);
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(lttng_live->private_component,
- "no-stream", lttng_live->no_stream_iter);
- if (!lttng_live->no_stream_port) {
+
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
+ "no-stream", lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
}
+ bt_get(port);
ret = bt_private_port_remove_from_component(port);
+ bt_put(port);
if (ret) {
return -1;
}
void lttng_live_destroy_trace(struct bt_object *obj)
{
struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
- int retval;
BT_LOGI("Destroy trace");
assert(bt_list_empty(&trace->streams));
bt_list_del(&trace->node);
- retval = bt_ctf_trace_set_is_static(trace->trace);
- assert(!retval);
+ if (trace->trace) {
+ int retval;
+ retval = bt_ctf_trace_set_is_static(trace->trace);
+ assert(!retval);
+ BT_PUT(trace->trace);
+ }
lttng_live_metadata_fini(trace);
BT_PUT(trace->cc_prio_map);
g_free(trace);
}
BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+ uint64_t session_id, const char *hostname,
+ const char *session_name)
{
int ret = 0;
struct lttng_live_session *s;
BT_INIT_LIST_HEAD(&s->traces);
s->lttng_live = lttng_live;
s->new_streams_needed = true;
+ s->hostname = g_string_new(hostname);
+ s->session_name = g_string_new(session_name);
- BT_LOGI("Reading from session %" PRIu64, s->id);
+ BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+ s->id, hostname, session_name);
bt_list_add(&s->node, <tng_live->sessions);
goto end;
error:
BT_LOGI("Destroy session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- if (!bt_graph_is_canceled(session->lttng_live->graph)) {
+ if (!lttng_live_is_canceled(session->lttng_live)) {
/* Old relayd cannot detach sessions. */
BT_LOGD("Unable to detach session %" PRIu64,
session->id);
lttng_live_close_trace_streams(trace);
}
bt_list_del(&session->node);
+ if (session->hostname) {
+ g_string_free(session->hostname, TRUE);
+ }
+ if (session->session_name) {
+ g_string_free(session->session_name, TRUE);
+ }
g_free(session);
}
struct lttng_live_trace *trace, *t;
if (lttng_live_attach_session(session)) {
- if (bt_graph_is_canceled(lttng_live->graph)) {
+ if (lttng_live_is_canceled(lttng_live)) {
return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
lttng_live = no_stream_iter->lttng_live;
retry:
lttng_live_force_new_streams_and_metadata(lttng_live);
+ next_return.notification = NULL;
status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
- break;
case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
break;
g_string_free(lttng_live->url, TRUE);
}
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
assert(!ret);
- BT_PUT(lttng_live->no_stream_port);
}
if (lttng_live->no_stream_iter) {
g_free(lttng_live->no_stream_iter);
static
struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
- struct bt_private_component *private_component,
- struct bt_graph *graph)
+ struct bt_private_component *private_component)
{
struct lttng_live_component *lttng_live;
struct bt_value *value = NULL;
if (!lttng_live->url) {
goto error;
}
+ BT_PUT(value);
lttng_live->viewer_connection =
bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
if (!lttng_live->viewer_connection) {
goto error;
}
lttng_live->private_component = private_component;
- lttng_live->graph = graph;
goto end;
{
struct lttng_live_component *lttng_live;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- struct bt_component *component;
- struct bt_graph *graph;
-
- component = bt_component_from_private_component(private_component);
- graph = bt_component_get_graph(component);
- bt_put(graph); /* weak */
- bt_put(component);
/* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, private_component,
- graph);
+ lttng_live = lttng_live_component_create(params, private_component);
if (!lttng_live) {
- if (bt_graph_is_canceled(graph)) {
- ret = BT_COMPONENT_STATUS_AGAIN;
- } else {
- ret = BT_COMPONENT_STATUS_NOMEM;
- }
+ //TODO : we need access to the application cancel state
+ //because we are not part of a graph yet.
+ ret = BT_COMPONENT_STATUS_NOMEM;
goto end;
}
lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
lttng_live->no_stream_iter->lttng_live = lttng_live;
-
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(
+ if (lttng_live_is_canceled(lttng_live)) {
+ goto end;
+ }
+ ret = bt_private_component_source_add_output_private_port(
lttng_live->private_component, "no-stream",
- lttng_live->no_stream_iter);
+ lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
ret = bt_private_component_set_user_data(private_component, lttng_live);
bt_put(self_port);
return status;
}
-
-static
-void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
-{
- enum bt_logging_level log_level = BT_LOG_NONE;
- const char *log_level_env = getenv(BT_LOGLEVEL_NAME);
-
- if (!log_level_env) {
- return;
- }
-
- if (strcmp(log_level_env, "VERBOSE") == 0) {
- log_level = BT_LOGGING_LEVEL_VERBOSE;
- } else if (strcmp(log_level_env, "DEBUG") == 0) {
- log_level = BT_LOGGING_LEVEL_DEBUG;
- } else if (strcmp(log_level_env, "INFO") == 0) {
- log_level = BT_LOGGING_LEVEL_INFO;
- } else if (strcmp(log_level_env, "WARN") == 0) {
- log_level = BT_LOGGING_LEVEL_WARN;
- } else if (strcmp(log_level_env, "ERROR") == 0) {
- log_level = BT_LOGGING_LEVEL_ERROR;
- } else if (strcmp(log_level_env, "FATAL") == 0) {
- log_level = BT_LOGGING_LEVEL_FATAL;
- } else {
- bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL;
- BT_LOGF("Incorrect log level specified in %s",
- BT_LOGLEVEL_NAME);
- abort();
- }
-
- bt_lttng_live_log_level = log_level;
-}