* SOFTWARE.
*/
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
#include <babeltrace/ctf-ir/packet.h>
#include <babeltrace/graph/component-source.h>
#include <babeltrace/graph/private-port.h>
#include <babeltrace/graph/notification-heap.h>
#include <babeltrace/graph/notification-iterator.h>
#include <babeltrace/graph/notification-inactivity.h>
+#include <babeltrace/graph/graph.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
#include <inttypes.h>
#include <glib.h>
#include <assert.h>
#include <unistd.h>
#include <plugins-common.h>
-#include "lttng-live-internal.h"
#include "data-stream.h"
#include "metadata.h"
+#include "lttng-live-internal.h"
-#define PRINT_ERR_STREAM (lttng_live->error_fp)
-#define PRINT_PREFIX "lttng-live"
-#define PRINT_DBG_CHECK lttng_live_debug
#define MAX_QUERY_SIZE (256*1024)
-#include "../print.h"
-#ifdef LIVE_DEBUG
-#define print_dbg(fmt, args...) \
- fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \
- __func__, __LINE__, ## args)
+#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
static const char *print_state(struct lttng_live_stream_iterator *s)
{
return "ERROR";
}
}
-#else
-#define print_dbg(fmt, args...)
-#endif
-#define print_stream_state(stream) \
- print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
- bt_port_get_name(bt_port_from_private_port(stream->port)), \
- print_state(stream), stream->last_returned_inactivity_timestamp, \
- stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+ struct bt_port *port;
+
+ port = bt_port_from_private_port(stream->port);
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+ bt_port_get_name(port),
+ print_state(stream),
+ stream->last_returned_inactivity_timestamp,
+ stream->current_inactivity_timestamp);
+ bt_put(port);
+}
BT_HIDDEN
-bool lttng_live_debug;
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+ struct bt_component *component;
+ struct bt_graph *graph;
+ bt_bool ret;
+
+ if (!lttng_live) {
+ return BT_FALSE;
+ }
+
+ component = bt_component_from_private_component(lttng_live->private_component);
+ graph = bt_component_get_graph(component);
+ ret = bt_graph_is_canceled(graph);
+ bt_put(graph);
+ bt_put(component);
+ return ret;
+}
BT_HIDDEN
int lttng_live_add_port(struct lttng_live_component *lttng_live,
int ret;
struct bt_private_port *private_port;
char name[STREAM_NAME_MAX_LEN];
+ enum bt_component_status status;
ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
assert(ret > 0);
strcpy(stream_iter->name, name);
- private_port = bt_private_component_source_add_output_private_port(
- lttng_live->private_component, name, stream_iter);
- if (!private_port) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(
+ lttng_live->private_component, name, stream_iter,
+ &private_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
- PDBG("Added port %s\n", name);
+ bt_put(private_port); /* weak */
+ BT_LOGI("Added port %s", name);
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
if (ret) {
return -1;
}
- BT_PUT(lttng_live->no_stream_port);
+ lttng_live->no_stream_port = NULL;
lttng_live->no_stream_iter->port = NULL;
}
stream_iter->port = private_port;
}
BT_PUT(component);
if (nr_ports == 1) {
+ enum bt_component_status status;
+
assert(!lttng_live->no_stream_port);
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(lttng_live->private_component,
- "no-stream", lttng_live->no_stream_iter);
- if (!lttng_live->no_stream_port) {
+
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
+ "no-stream", lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
}
+ bt_get(port);
ret = bt_private_port_remove_from_component(port);
+ bt_put(port);
if (ret) {
return -1;
}
{
struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
- PDBG("Destroy trace\n");
+ BT_LOGI("Destroy trace");
assert(bt_list_empty(&trace->streams));
bt_list_del(&trace->node);
+
+ if (trace->trace) {
+ int retval;
+
+ retval = bt_ctf_trace_set_is_static(trace->trace);
+ assert(!retval);
+ BT_PUT(trace->trace);
+ }
lttng_live_metadata_fini(trace);
BT_PUT(trace->cc_prio_map);
g_free(trace);
trace->new_metadata_needed = true;
bt_list_add(&trace->node, &session->traces);
bt_object_init(&trace->obj, lttng_live_destroy_trace);
+ BT_LOGI("Create trace");
goto end;
error:
g_free(trace);
}
BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+ uint64_t session_id, const char *hostname,
+ const char *session_name)
{
int ret = 0;
struct lttng_live_session *s;
BT_INIT_LIST_HEAD(&s->traces);
s->lttng_live = lttng_live;
s->new_streams_needed = true;
+ s->hostname = g_string_new(hostname);
+ s->session_name = g_string_new(session_name);
- PDBG("Reading from session %" PRIu64 "\n", s->id);
+ BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+ s->id, hostname, session_name);
bt_list_add(&s->node, <tng_live->sessions);
goto end;
error:
- PERR("Error adding session\n");
+ BT_LOGE("Error adding session");
g_free(s);
ret = -1;
end:
{
struct lttng_live_trace *trace, *t;
- PDBG("Destroy session\n");
+ BT_LOGI("Destroy session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- /* Old relayd cannot detach sessions. */
- PDBG("Unable to detach session %" PRIu64 "\n",
- session->id);
+ if (!lttng_live_is_canceled(session->lttng_live)) {
+ /* Old relayd cannot detach sessions. */
+ BT_LOGD("Unable to detach session %" PRIu64,
+ session->id);
+ }
}
session->id = -1ULL;
}
lttng_live_close_trace_streams(trace);
}
bt_list_del(&session->node);
+ if (session->hostname) {
+ g_string_free(session->hostname, TRUE);
+ }
+ if (session->session_name) {
+ g_string_free(session->session_name, TRUE);
+ }
g_free(session);
}
break;
case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
/* Invalid state. */
- PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
+ abort();
case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
/* Invalid state. */
- PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
+ abort();
case LTTNG_LIVE_STREAM_EOF:
break;
}
struct lttng_live_trace *trace, *t;
if (lttng_live_attach_session(session)) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ if (lttng_live_is_canceled(lttng_live)) {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ } else {
+ return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ }
}
status = lttng_live_get_new_streams(session);
if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
}
bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
status = lttng_live_metadata_update(trace);
- if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
- int retval;
-
- retval = bt_ctf_trace_set_is_static(trace->trace);
- assert(!retval);
- } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
return status;
}
}
}
static
-enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
struct lttng_live_component *lttng_live)
{
enum bt_ctf_lttng_live_iterator_status ret =
lttng_live = no_stream_iter->lttng_live;
retry:
lttng_live_force_new_streams_and_metadata(lttng_live);
+ next_return.notification = NULL;
status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
- break;
case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
break;
enum bt_notification_iterator_status ret =
BT_NOTIFICATION_ITERATOR_STATUS_OK;
struct lttng_live_stream_iterator_generic *s;
- struct lttng_live_component *lttng_live;
assert(it);
{
struct lttng_live_no_stream_iterator *no_stream_iter =
container_of(s, struct lttng_live_no_stream_iterator, p);
- lttng_live = no_stream_iter->lttng_live;
ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
if (ret) {
goto error;
{
struct lttng_live_stream_iterator *stream_iter =
container_of(s, struct lttng_live_stream_iterator, p);
- lttng_live = stream_iter->trace->session->lttng_live;
ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
if (ret) {
goto error;
error:
if (bt_private_notification_iterator_set_user_data(it, NULL)
!= BT_NOTIFICATION_ITERATOR_STATUS_OK) {
- PERR("Error setting private data to NULL\n");
+ BT_LOGE("Error setting private data to NULL");
}
goto end;
}
struct bt_value *results = NULL;
const char *url;
struct bt_live_viewer_connection *viewer_connection = NULL;
- enum bt_value_status ret;
url_value = bt_value_map_get(params, "url");
if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
- fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ BT_LOGW("Mandatory \"url\" parameter missing");
goto error;
}
- ret = bt_value_string_get(url_value, &url);
- if (ret != BT_VALUE_STATUS_OK) {
- fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
+ BT_LOGW("\"url\" parameter is required to be a string value");
goto error;
}
- viewer_connection = bt_live_viewer_connection_create(url, stderr);
+ viewer_connection = bt_live_viewer_connection_create(url, NULL);
if (!viewer_connection) {
- ret = BT_COMPONENT_STATUS_NOMEM;
goto error;
}
return lttng_live_query_list_sessions(comp_class,
params);
}
- fprintf(stderr, "Unknown query object `%s`\n", object);
+ BT_LOGW("Unknown query object `%s`", object);
return NULL;
}
g_string_free(lttng_live->url, TRUE);
}
if (lttng_live->no_stream_port) {
+ bt_get(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_put(lttng_live->no_stream_port);
assert(!ret);
- BT_PUT(lttng_live->no_stream_port);
}
if (lttng_live->no_stream_iter) {
g_free(lttng_live->no_stream_iter);
if (!lttng_live) {
goto end;
}
- lttng_live->error_fp = stderr;
/* TODO: make this an overridable parameter. */
lttng_live->max_query_size = MAX_QUERY_SIZE;
BT_INIT_LIST_HEAD(<tng_live->sessions);
value = bt_value_map_get(params, "url");
if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
- fprintf(stderr, "Mandatory \"url\" parameter missing\n");
+ BT_LOGW("Mandatory \"url\" parameter missing");
goto error;
}
ret = bt_value_string_get(value, &url);
if (ret != BT_VALUE_STATUS_OK) {
- fprintf(stderr, "\"url\" parameter is required to be a string value\n");
+ BT_LOGW("\"url\" parameter is required to be a string value");
goto error;
}
lttng_live->url = g_string_new(url);
if (!lttng_live->url) {
goto error;
}
+ BT_PUT(value);
lttng_live->viewer_connection =
- bt_live_viewer_connection_create(lttng_live->url->str,
- stderr);
+ bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
if (!lttng_live->viewer_connection) {
- ret = BT_COMPONENT_STATUS_NOMEM;
goto error;
}
if (lttng_live_create_viewer_session(lttng_live)) {
- ret = BT_COMPONENT_STATUS_ERROR;
goto error;
}
lttng_live->private_component = private_component;
}
BT_HIDDEN
-enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
+enum bt_component_status lttng_live_component_init(
+ struct bt_private_component *private_component,
struct bt_value *params, void *init_method_data)
{
struct lttng_live_component *lttng_live;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
-
/* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, component);
+ lttng_live = lttng_live_component_create(params, private_component);
if (!lttng_live) {
+ //TODO : we need access to the application cancel state
+ //because we are not part of a graph yet.
ret = BT_COMPONENT_STATUS_NOMEM;
goto end;
}
lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
lttng_live->no_stream_iter->lttng_live = lttng_live;
-
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(
+ if (lttng_live_is_canceled(lttng_live)) {
+ goto end;
+ }
+ ret = bt_private_component_source_add_output_private_port(
lttng_live->private_component, "no-stream",
- lttng_live->no_stream_iter);
+ lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ bt_put(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
- ret = bt_private_component_set_user_data(component, lttng_live);
+ ret = bt_private_component_set_user_data(private_component, lttng_live);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}
end:
return ret;
error:
- (void) bt_private_component_set_user_data(component, NULL);
+ (void) bt_private_component_set_user_data(private_component, NULL);
lttng_live_component_destroy_data(lttng_live);
return ret;
}
+
+BT_HIDDEN
+enum bt_component_status lttng_live_accept_port_connection(
+ struct bt_private_component *private_component,
+ struct bt_private_port *self_private_port,
+ struct bt_port *other_port)
+{
+ struct lttng_live_component *lttng_live =
+ bt_private_component_get_user_data(private_component);
+ struct bt_component *other_component;
+ enum bt_component_status status = BT_COMPONENT_STATUS_OK;
+ struct bt_port *self_port = bt_port_from_private_port(self_private_port);
+
+ other_component = bt_port_get_component(other_port);
+ bt_put(other_component); /* weak */
+
+ if (!lttng_live->downstream_component) {
+ lttng_live->downstream_component = other_component;
+ goto end;
+ }
+
+ /*
+ * Compare prior component to ensure we are connected to the
+ * same downstream component as prior ports.
+ */
+ if (lttng_live->downstream_component != other_component) {
+ BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
+ bt_port_get_name(self_port),
+ bt_component_get_name(other_component),
+ bt_component_get_name(lttng_live->downstream_component));
+ status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
+ goto end;
+ }
+end:
+ bt_put(self_port);
+ return status;
+}