#include <babeltrace/graph/notification-heap.h>
#include <babeltrace/graph/notification-iterator.h>
#include <babeltrace/graph/notification-inactivity.h>
+#include <babeltrace/graph/graph.h>
#include <babeltrace/compiler-internal.h>
#include <inttypes.h>
#include <glib.h>
#include <unistd.h>
#include <plugins-common.h>
-#include "lttng-live-internal.h"
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
+#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
+
#include "data-stream.h"
#include "metadata.h"
+#include "lttng-live-internal.h"
-#define PRINT_ERR_STREAM (lttng_live->error_fp)
-#define PRINT_PREFIX "lttng-live"
-#define PRINT_DBG_CHECK lttng_live_debug
#define MAX_QUERY_SIZE (256*1024)
-#include "../print.h"
-#ifdef LIVE_DEBUG
-#define print_dbg(fmt, args...) \
- fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \
- __func__, __LINE__, ## args)
+#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
static const char *print_state(struct lttng_live_stream_iterator *s)
{
return "ERROR";
}
}
-#else
-#define print_dbg(fmt, args...)
-#endif
#define print_stream_state(stream) \
print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
stream->current_inactivity_timestamp)
BT_HIDDEN
-bool lttng_live_debug;
+int bt_lttng_live_log_level = BT_LOG_NONE;
BT_HIDDEN
int lttng_live_add_port(struct lttng_live_component *lttng_live,
if (!private_port) {
return -1;
}
- PDBG("Added port %s\n", name);
+ BT_LOGI("Added port %s", name);
if (lttng_live->no_stream_port) {
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
void lttng_live_destroy_trace(struct bt_object *obj)
{
struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
+ int retval;
- PDBG("Destroy trace\n");
+ BT_LOGI("Destroy trace");
assert(bt_list_empty(&trace->streams));
bt_list_del(&trace->node);
+
+ retval = bt_ctf_trace_set_is_static(trace->trace);
+ assert(!retval);
+
lttng_live_metadata_fini(trace);
BT_PUT(trace->cc_prio_map);
g_free(trace);
trace->new_metadata_needed = true;
bt_list_add(&trace->node, &session->traces);
bt_object_init(&trace->obj, lttng_live_destroy_trace);
+ BT_LOGI("Create trace");
goto end;
error:
g_free(trace);
s->lttng_live = lttng_live;
s->new_streams_needed = true;
- PDBG("Reading from session %" PRIu64 "\n", s->id);
+ BT_LOGI("Reading from session %" PRIu64, s->id);
bt_list_add(&s->node, <tng_live->sessions);
goto end;
error:
- PERR("Error adding session\n");
+ BT_LOGE("Error adding session");
g_free(s);
ret = -1;
end:
{
struct lttng_live_trace *trace, *t;
- PDBG("Destroy session\n");
+ BT_LOGI("Destroy session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- /* Old relayd cannot detach sessions. */
- PDBG("Unable to detach session %" PRIu64 "\n",
- session->id);
+ if (!bt_graph_is_canceled(session->lttng_live->graph)) {
+ /* Old relayd cannot detach sessions. */
+ BT_LOGD("Unable to detach session %" PRIu64,
+ session->id);
+ }
}
session->id = -1ULL;
}
break;
case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
/* Invalid state. */
- PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
+ abort();
case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
/* Invalid state. */
- PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
+ abort();
case LTTNG_LIVE_STREAM_EOF:
break;
}
struct lttng_live_trace *trace, *t;
if (lttng_live_attach_session(session)) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (bt_graph_is_canceled(lttng_live->graph)) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
}
status = lttng_live_get_new_streams(session);
if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
}
bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
status = lttng_live_metadata_update(trace);
- if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
- int retval;
-
- retval = bt_ctf_trace_set_is_static(trace->trace);
- assert(!retval);
- } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
return status;
}
}
}
static
-enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
struct lttng_live_component *lttng_live)
{
enum bt_ctf_lttng_live_iterator_status ret =
enum bt_notification_iterator_status ret =
BT_NOTIFICATION_ITERATOR_STATUS_OK;
struct lttng_live_stream_iterator_generic *s;
- struct lttng_live_component *lttng_live;
assert(it);
{
struct lttng_live_no_stream_iterator *no_stream_iter =
container_of(s, struct lttng_live_no_stream_iterator, p);
- lttng_live = no_stream_iter->lttng_live;
ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
if (ret) {
goto error;
{
struct lttng_live_stream_iterator *stream_iter =
container_of(s, struct lttng_live_stream_iterator, p);
- lttng_live = stream_iter->trace->session->lttng_live;
ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
if (ret) {
goto error;
error:
if (bt_private_notification_iterator_set_user_data(it, NULL)
!= BT_NOTIFICATION_ITERATOR_STATUS_OK) {
- PERR("Error setting private data to NULL\n");
+ BT_LOGE("Error setting private data to NULL");
}
goto end;
}
struct bt_value *results = NULL;
const char *url;
struct bt_live_viewer_connection *viewer_connection = NULL;
- enum bt_value_status ret;
url_value = bt_value_map_get(params, "url");
if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
- fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ BT_LOGW("Mandatory \"url\" parameter missing");
goto error;
}
- ret = bt_value_string_get(url_value, &url);
- if (ret != BT_VALUE_STATUS_OK) {
- fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
+ BT_LOGW("\"url\" parameter is required to be a string value");
goto error;
}
- viewer_connection = bt_live_viewer_connection_create(url, stderr);
+ viewer_connection = bt_live_viewer_connection_create(url, NULL);
if (!viewer_connection) {
- ret = BT_COMPONENT_STATUS_NOMEM;
goto error;
}
return lttng_live_query_list_sessions(comp_class,
params);
}
- fprintf(stderr, "Unknown query object `%s`\n", object);
+ BT_LOGW("Unknown query object `%s`", object);
return NULL;
}
static
struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
- struct bt_private_component *private_component)
+ struct bt_private_component *private_component,
+ struct bt_graph *graph)
{
struct lttng_live_component *lttng_live;
struct bt_value *value = NULL;
if (!lttng_live) {
goto end;
}
- lttng_live->error_fp = stderr;
/* TODO: make this an overridable parameter. */
lttng_live->max_query_size = MAX_QUERY_SIZE;
BT_INIT_LIST_HEAD(<tng_live->sessions);
value = bt_value_map_get(params, "url");
if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
- fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ BT_LOGW("Mandatory \"url\" parameter missing");
goto error;
}
ret = bt_value_string_get(value, &url);
if (ret != BT_VALUE_STATUS_OK) {
- fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ BT_LOGW("\"url\" parameter is required to be a string value");
goto error;
}
lttng_live->url = g_string_new(url);
goto error;
}
lttng_live->viewer_connection =
- bt_live_viewer_connection_create(lttng_live->url->str,
- stderr);
+ bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
if (!lttng_live->viewer_connection) {
- ret = BT_COMPONENT_STATUS_NOMEM;
goto error;
}
if (lttng_live_create_viewer_session(lttng_live)) {
- ret = BT_COMPONENT_STATUS_ERROR;
goto error;
}
lttng_live->private_component = private_component;
+ lttng_live->graph = graph;
goto end;
}
BT_HIDDEN
-enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
+enum bt_component_status lttng_live_component_init(
+ struct bt_private_component *private_component,
struct bt_value *params, void *init_method_data)
{
struct lttng_live_component *lttng_live;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+ struct bt_component *component;
+ struct bt_graph *graph;
- lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
+ component = bt_component_from_private_component(private_component);
+ graph = bt_component_get_graph(component);
+ bt_put(graph); /* weak */
+ bt_put(component);
/* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, component);
+ lttng_live = lttng_live_component_create(params, private_component,
+ graph);
if (!lttng_live) {
- ret = BT_COMPONENT_STATUS_NOMEM;
+ if (bt_graph_is_canceled(graph)) {
+ ret = BT_COMPONENT_STATUS_AGAIN;
+ } else {
+ ret = BT_COMPONENT_STATUS_NOMEM;
+ }
goto end;
}
lttng_live->no_stream_iter);
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
- ret = bt_private_component_set_user_data(component, lttng_live);
+ ret = bt_private_component_set_user_data(private_component, lttng_live);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}
end:
return ret;
error:
- (void) bt_private_component_set_user_data(component, NULL);
+ (void) bt_private_component_set_user_data(private_component, NULL);
lttng_live_component_destroy_data(lttng_live);
return ret;
}
+
+BT_HIDDEN
+enum bt_component_status lttng_live_accept_port_connection(
+ struct bt_private_component *private_component,
+ struct bt_private_port *self_private_port,
+ struct bt_port *other_port)
+{
+ struct lttng_live_component *lttng_live =
+ bt_private_component_get_user_data(private_component);
+ struct bt_component *other_component;
+ enum bt_component_status status = BT_COMPONENT_STATUS_OK;
+ struct bt_port *self_port = bt_port_from_private_port(self_private_port);
+
+ other_component = bt_port_get_component(other_port);
+ bt_put(other_component); /* weak */
+
+ if (!lttng_live->downstream_component) {
+ lttng_live->downstream_component = other_component;
+ goto end;
+ }
+
+ /*
+ * Compare prior component to ensure we are connected to the
+ * same downstream component as prior ports.
+ */
+ if (lttng_live->downstream_component != other_component) {
+ BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
+ bt_port_get_name(self_port),
+ bt_component_get_name(other_component),
+ bt_component_get_name(lttng_live->downstream_component));
+ status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
+ goto end;
+ }
+end:
+ bt_put(self_port);
+ return status;
+}
+
+static
+void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
+{
+ bt_lttng_live_log_level = bt_log_get_level_from_env(BT_LOGLEVEL_NAME);
+}