* SOFTWARE.
*/
-#include <babeltrace/ctf-ir/packet.h>
-#include <babeltrace/graph/component-source.h>
-#include <babeltrace/graph/private-port.h>
-#include <babeltrace/graph/port.h>
-#include <babeltrace/graph/private-component.h>
-#include <babeltrace/graph/private-component-source.h>
-#include <babeltrace/graph/private-notification-iterator.h>
-#include <babeltrace/graph/notification-stream.h>
-#include <babeltrace/graph/notification-packet.h>
-#include <babeltrace/graph/notification-event.h>
-#include <babeltrace/graph/notification-heap.h>
-#include <babeltrace/graph/notification-iterator.h>
-#include <babeltrace/graph/notification-inactivity.h>
-#include <babeltrace/graph/graph.h>
+#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
+#include "logging.h"
+
+#include <babeltrace/babeltrace.h>
#include <babeltrace/compiler-internal.h>
+#include <babeltrace/types.h>
#include <inttypes.h>
#include <glib.h>
-#include <assert.h>
+#include <babeltrace/assert-internal.h>
#include <unistd.h>
#include <plugins-common.h>
-#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
-#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
-
#include "data-stream.h"
#include "metadata.h"
#include "lttng-live-internal.h"
}
}
-#define print_stream_state(stream) \
- print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
- bt_port_get_name(bt_port_from_private_port(stream->port)), \
- print_state(stream), stream->last_returned_inactivity_timestamp, \
- stream->current_inactivity_timestamp)
+static
+void print_stream_state(struct lttng_live_stream_iterator *stream)
+{
+ const bt_port *port;
+
+ port = bt_port_from_private(stream->port);
+ print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
+ bt_port_get_name(port),
+ print_state(stream),
+ stream->last_returned_inactivity_timestamp,
+ stream->current_inactivity_timestamp);
+ bt_port_put_ref(port);
+}
BT_HIDDEN
-int bt_lttng_live_log_level = BT_LOG_NONE;
+bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
+{
+ bt_component *component;
+ const bt_graph *graph;
+ bt_bool ret;
+
+ if (!lttng_live) {
+ return BT_FALSE;
+ }
+
+ component = bt_component_from_private(lttng_live->private_component);
+ graph = bt_component_get_graph(component);
+ ret = bt_graph_is_canceled(graph);
+ bt_graph_put_ref(graph);
+ bt_component_put_ref(component);
+ return ret;
+}
BT_HIDDEN
int lttng_live_add_port(struct lttng_live_component *lttng_live,
int ret;
struct bt_private_port *private_port;
char name[STREAM_NAME_MAX_LEN];
+ enum bt_component_status status;
ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
- assert(ret > 0);
+ BT_ASSERT(ret > 0);
strcpy(stream_iter->name, name);
- private_port = bt_private_component_source_add_output_private_port(
- lttng_live->private_component, name, stream_iter);
- if (!private_port) {
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_self_component_source_add_output_port(
+ lttng_live->private_component, name, stream_iter,
+ &private_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_object_put_ref(private_port); /* weak */
BT_LOGI("Added port %s", name);
if (lttng_live->no_stream_port) {
+ bt_object_get_ref(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
+ bt_object_put_ref(lttng_live->no_stream_port);
if (ret) {
return -1;
}
- BT_PUT(lttng_live->no_stream_port);
+ lttng_live->no_stream_port = NULL;
lttng_live->no_stream_iter->port = NULL;
}
stream_iter->port = private_port;
int lttng_live_remove_port(struct lttng_live_component *lttng_live,
struct bt_private_port *port)
{
- struct bt_component *component;
+ bt_component *component;
int64_t nr_ports;
int ret;
- component = bt_component_from_private_component(lttng_live->private_component);
+ component = bt_component_from_private(lttng_live->private_component);
nr_ports = bt_component_source_get_output_port_count(component);
if (nr_ports < 0) {
return -1;
}
- BT_PUT(component);
+ BT_COMPONENT_PUT_REF_AND_RESET(component);
if (nr_ports == 1) {
- assert(!lttng_live->no_stream_port);
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(lttng_live->private_component,
- "no-stream", lttng_live->no_stream_iter);
- if (!lttng_live->no_stream_port) {
+ enum bt_component_status status;
+
+ BT_ASSERT(!lttng_live->no_stream_port);
+
+ if (lttng_live_is_canceled(lttng_live)) {
+ return 0;
+ }
+ status = bt_self_component_source_add_output_port(lttng_live->private_component,
+ "no-stream", lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ switch (status) {
+ case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ return 0;
+ case BT_COMPONENT_STATUS_OK:
+ break;
+ default:
return -1;
}
+ bt_object_put_ref(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
}
+ bt_object_get_ref(port);
ret = bt_private_port_remove_from_component(port);
+ bt_object_put_ref(port);
if (ret) {
return -1;
}
}
static
-void lttng_live_destroy_trace(struct bt_object *obj)
+void lttng_live_destroy_trace(bt_object *obj)
{
struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
- int retval;
BT_LOGI("Destroy trace");
- assert(bt_list_empty(&trace->streams));
+ BT_ASSERT(bt_list_empty(&trace->streams));
bt_list_del(&trace->node);
- retval = bt_ctf_trace_set_is_static(trace->trace);
- assert(!retval);
+ if (trace->trace) {
+ int retval;
+ retval = bt_trace_set_is_static(trace->trace);
+ BT_ASSERT(!retval);
+ BT_TRACE_PUT_REF_AND_RESET(trace->trace);
+ }
lttng_live_metadata_fini(trace);
- BT_PUT(trace->cc_prio_map);
+ BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
g_free(trace);
}
trace = lttng_live_find_trace(session, trace_id);
if (trace) {
- bt_get(trace);
+ bt_object_get_ref(trace);
return trace;
}
return lttng_live_create_trace(session, trace_id);
BT_HIDDEN
void lttng_live_unref_trace(struct lttng_live_trace *trace)
{
- bt_put(trace);
+ bt_object_put_ref(trace);
}
static
}
BT_HIDDEN
-int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
+int lttng_live_add_session(struct lttng_live_component *lttng_live,
+ uint64_t session_id, const char *hostname,
+ const char *session_name)
{
int ret = 0;
struct lttng_live_session *s;
BT_INIT_LIST_HEAD(&s->traces);
s->lttng_live = lttng_live;
s->new_streams_needed = true;
+ s->hostname = g_string_new(hostname);
+ s->session_name = g_string_new(session_name);
- BT_LOGI("Reading from session %" PRIu64, s->id);
+ BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
+ s->id, hostname, session_name);
bt_list_add(&s->node, <tng_live->sessions);
goto end;
error:
BT_LOGI("Destroy session");
if (session->id != -1ULL) {
if (lttng_live_detach_session(session)) {
- if (!bt_graph_is_canceled(session->lttng_live->graph)) {
+ if (!lttng_live_is_canceled(session->lttng_live)) {
/* Old relayd cannot detach sessions. */
BT_LOGD("Unable to detach session %" PRIu64,
session->id);
lttng_live_close_trace_streams(trace);
}
bt_list_del(&session->node);
+ if (session->hostname) {
+ g_string_free(session->hostname, TRUE);
+ }
+ if (session->session_name) {
+ g_string_free(session->session_name, TRUE);
+ }
g_free(session);
}
BT_HIDDEN
-void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
+void lttng_live_iterator_finalize(bt_self_message_iterator *it)
{
struct lttng_live_stream_iterator_generic *s =
- bt_private_notification_iterator_get_user_data(it);
+ bt_self_message_iterator_get_user_data(it);
switch (s->type) {
case LIVE_STREAM_TYPE_NO_STREAM:
}
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *lttng_live_stream)
{
case LTTNG_LIVE_STREAM_EOF:
break;
}
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
}
/*
* return EOF.
*/
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *lttng_live_stream)
{
- enum bt_ctf_lttng_live_iterator_status ret =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
struct packet_index index;
enum lttng_live_stream_state orig_state = lttng_live_stream->state;
if (lttng_live_stream->trace->new_metadata_needed) {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->trace->session->new_streams_needed) {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
goto end;
}
ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
- if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
- assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
+ BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
&& lttng_live_stream->last_returned_inactivity_timestamp ==
lttng_live_stream->current_inactivity_timestamp) {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
print_stream_state(lttng_live_stream);
} else {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
goto end;
}
lttng_live_stream->offset = index.offset;
lttng_live_stream->len = index.packet_size / CHAR_BIT;
end:
- if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
ret = lttng_live_iterator_next_check_stream_state(
lttng_live, lttng_live_stream);
}
}
/*
- * Creation of the notification requires the ctf trace to be created
+ * Creation of the message requires the ctf trace to be created
* beforehand, but the live protocol gives us all streams (including
* metadata) at once. So we split it in three steps: getting streams,
* getting metadata (which creates the ctf trace), and then creating the
- * per-stream notifications.
+ * per-stream messages.
*/
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
+enum bt_lttng_live_iterator_status lttng_live_get_session(
struct lttng_live_component *lttng_live,
struct lttng_live_session *session)
{
- enum bt_ctf_lttng_live_iterator_status status;
+ enum bt_lttng_live_iterator_status status;
struct lttng_live_trace *trace, *t;
if (lttng_live_attach_session(session)) {
- if (bt_graph_is_canceled(lttng_live->graph)) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ if (lttng_live_is_canceled(lttng_live)) {
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
}
status = lttng_live_get_new_streams(session);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
- status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
return status;
}
bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
status = lttng_live_metadata_update(trace);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
- status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
+ status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
return status;
}
}
- return lttng_live_lazy_notif_init(session);
+ return lttng_live_lazy_msg_init(session);
}
BT_HIDDEN
}
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
struct lttng_live_component *lttng_live)
{
- enum bt_ctf_lttng_live_iterator_status ret =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
unsigned int nr_sessions_opened = 0;
struct lttng_live_session *session, *s;
* currently ongoing.
*/
if (bt_list_empty(<tng_live->sessions)) {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
}
bt_list_for_each_entry(session, <tng_live->sessions, node) {
ret = lttng_live_get_session(lttng_live, session);
switch (ret) {
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
default:
goto end;
}
}
end:
- if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
}
return ret;
}
static
-enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
+enum bt_lttng_live_iterator_status emit_inactivity_message(
struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *lttng_live_stream,
- struct bt_notification **notification,
+ const bt_message **message,
uint64_t timestamp)
{
- enum bt_ctf_lttng_live_iterator_status ret =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
struct lttng_live_trace *trace;
- struct bt_ctf_clock_class *clock_class = NULL;
- struct bt_ctf_clock_value *clock_value = NULL;
- struct bt_notification *notif = NULL;
+ const bt_clock_class *clock_class = NULL;
+ bt_clock_value *clock_value = NULL;
+ const bt_message *msg = NULL;
int retval;
trace = lttng_live_stream->trace;
if (!clock_class) {
goto error;
}
- clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
+ clock_value = bt_clock_value_create(clock_class, timestamp);
if (!clock_value) {
goto error;
}
- notif = bt_notification_inactivity_create(trace->cc_prio_map);
- if (!notif) {
+ msg = bt_message_inactivity_create(trace->cc_prio_map);
+ if (!msg) {
goto error;
}
- retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
+ retval = bt_message_inactivity_set_clock_value(msg, clock_value);
if (retval) {
goto error;
}
- *notification = notif;
+ *message = msg;
end:
- bt_put(clock_value);
- bt_put(clock_class);
+ bt_object_put_ref(clock_value);
+ bt_clock_class_put_ref(clock_class);
return ret;
error:
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
- bt_put(notif);
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ bt_message_put_ref(msg);
goto end;
}
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *lttng_live_stream,
- struct bt_notification **notification)
+ const bt_message **message)
{
- enum bt_ctf_lttng_live_iterator_status ret =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
- struct bt_ctf_clock_class *clock_class = NULL;
- struct bt_ctf_clock_value *clock_value = NULL;
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ const bt_clock_class *clock_class = NULL;
+ bt_clock_value *clock_value = NULL;
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
}
if (lttng_live_stream->current_inactivity_timestamp ==
lttng_live_stream->last_returned_inactivity_timestamp) {
lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
goto end;
}
- ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
+ ret = emit_inactivity_message(lttng_live, lttng_live_stream, message,
(uint64_t) lttng_live_stream->current_inactivity_timestamp);
lttng_live_stream->last_returned_inactivity_timestamp =
lttng_live_stream->current_inactivity_timestamp;
end:
- bt_put(clock_value);
- bt_put(clock_class);
+ bt_object_put_ref(clock_value);
+ bt_clock_class_put_ref(clock_class);
return ret;
}
static
-enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
+enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
struct lttng_live_component *lttng_live,
struct lttng_live_stream_iterator *lttng_live_stream,
- struct bt_notification **notification)
+ const bt_message **message)
{
- enum bt_ctf_lttng_live_iterator_status ret =
- BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
- enum bt_ctf_notif_iter_status status;
+ enum bt_lttng_live_iterator_status ret =
+ BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum bt_msg_iter_status status;
struct lttng_live_session *session;
bt_list_for_each_entry(session, <tng_live->sessions, node) {
struct lttng_live_trace *trace;
if (session->new_streams_needed) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
bt_list_for_each_entry(trace, &session->traces, node) {
if (trace->new_metadata_needed) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
}
}
}
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
- return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
- if (lttng_live_stream->packet_end_notif_queue) {
- *notification = lttng_live_stream->packet_end_notif_queue;
- lttng_live_stream->packet_end_notif_queue = NULL;
- status = BT_CTF_NOTIF_ITER_STATUS_OK;
+ if (lttng_live_stream->packet_end_msg_queue) {
+ *message = lttng_live_stream->packet_end_msg_queue;
+ lttng_live_stream->packet_end_msg_queue = NULL;
+ status = BT_MSG_ITER_STATUS_OK;
} else {
- status = bt_ctf_notif_iter_get_next_notification(
- lttng_live_stream->notif_iter,
+ status = bt_msg_iter_get_next_message(
+ lttng_live_stream->msg_iter,
lttng_live_stream->trace->cc_prio_map,
- notification);
- if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
+ message);
+ if (status == BT_MSG_ITER_STATUS_OK) {
/*
* Consider empty packets as inactivity.
*/
- if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
- lttng_live_stream->packet_end_notif_queue = *notification;
- *notification = NULL;
- return emit_inactivity_notification(lttng_live,
- lttng_live_stream, notification,
+ if (bt_message_get_type(*message) == BT_MESSAGE_TYPE_PACKET_END) {
+ lttng_live_stream->packet_end_msg_queue = *message;
+ *message = NULL;
+ return emit_inactivity_message(lttng_live,
+ lttng_live_stream, message,
lttng_live_stream->current_packet_end_timestamp);
}
}
}
switch (status) {
- case BT_CTF_NOTIF_ITER_STATUS_EOF:
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ case BT_MSG_ITER_STATUS_EOF:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
break;
- case BT_CTF_NOTIF_ITER_STATUS_OK:
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ case BT_MSG_ITER_STATUS_OK:
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
break;
- case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+ case BT_MSG_ITER_STATUS_AGAIN:
/*
* Continue immediately (end of packet). The next
* get_index may return AGAIN to delay the following
* attempt.
*/
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
break;
- case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+ case BT_MSG_ITER_STATUS_INVAL:
/* No argument provided by the user, so don't return INVAL. */
- case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+ case BT_MSG_ITER_STATUS_ERROR:
default:
- ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
break;
}
return ret;
* handle_active_data_streams()
* - if at least one stream is ACTIVE_DATA:
* - get stream event with lowest timestamp from heap
- * - make that stream event the current notification.
+ * - make that stream event the current message.
* - move this stream heap position to its next event
* - if we need to fetch data from relayd, move
* stream to ACTIVE_NO_DATA.
* When disconnected from relayd: try to re-connect endlessly.
*/
static
-struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
- struct bt_private_notification_iterator *iterator,
+bt_message_iterator_next_method_return lttng_live_iterator_next_stream(
+ bt_self_message_iterator *iterator,
struct lttng_live_stream_iterator *stream_iter)
{
- enum bt_ctf_lttng_live_iterator_status status;
- struct bt_notification_iterator_next_return next_return;
+ enum bt_lttng_live_iterator_status status;
+ bt_message_iterator_next_method_return next_return;
struct lttng_live_component *lttng_live;
lttng_live = stream_iter->trace->session->lttng_live;
retry:
print_stream_state(stream_iter);
- next_return.notification = NULL;
+ next_return.message = NULL;
status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
status = lttng_live_iterator_next_handle_one_no_data_stream(
lttng_live, stream_iter);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
status = lttng_live_iterator_next_handle_one_quiescent_stream(
- lttng_live, stream_iter, &next_return.notification);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
- assert(next_return.notification == NULL);
+ lttng_live, stream_iter, &next_return.message);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(next_return.message == NULL);
goto end;
}
- if (next_return.notification) {
+ if (next_return.message) {
goto end;
}
status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
- stream_iter, &next_return.notification);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
- assert(next_return.notification == NULL);
+ stream_iter, &next_return.message);
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ BT_ASSERT(next_return.message == NULL);
}
end:
switch (status) {
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
print_dbg("continue");
goto retry;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
print_dbg("again");
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
print_dbg("end");
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_OK;
print_dbg("ok");
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
default: /* fall-through */
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
break;
}
return next_return;
}
static
-struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
- struct bt_private_notification_iterator *iterator,
+bt_message_iterator_next_method_return lttng_live_iterator_next_no_stream(
+ bt_self_message_iterator *iterator,
struct lttng_live_no_stream_iterator *no_stream_iter)
{
- enum bt_ctf_lttng_live_iterator_status status;
- struct bt_notification_iterator_next_return next_return;
+ enum bt_lttng_live_iterator_status status;
+ bt_message_iterator_next_method_return next_return;
struct lttng_live_component *lttng_live;
lttng_live = no_stream_iter->lttng_live;
retry:
lttng_live_force_new_streams_and_metadata(lttng_live);
+ next_return.message = NULL;
status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
- if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
+ if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
if (no_stream_iter->port) {
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
}
end:
switch (status) {
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
goto retry;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_AGAIN;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_END;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_INVALID;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_NOMEM;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_UNSUPPORTED;
break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
- break;
- case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
+ case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
default: /* fall-through */
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
break;
}
return next_return;
}
BT_HIDDEN
-struct bt_notification_iterator_next_return lttng_live_iterator_next(
- struct bt_private_notification_iterator *iterator)
+bt_message_iterator_next_method_return lttng_live_iterator_next(
+ bt_self_message_iterator *iterator)
{
struct lttng_live_stream_iterator_generic *s =
- bt_private_notification_iterator_get_user_data(iterator);
- struct bt_notification_iterator_next_return next_return;
+ bt_self_message_iterator_get_user_data(iterator);
+ bt_message_iterator_next_method_return next_return;
switch (s->type) {
case LIVE_STREAM_TYPE_NO_STREAM:
container_of(s, struct lttng_live_stream_iterator, p));
break;
default:
- next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ next_return.status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
break;
}
return next_return;
}
BT_HIDDEN
-enum bt_notification_iterator_status lttng_live_iterator_init(
- struct bt_private_notification_iterator *it,
+enum bt_message_iterator_status lttng_live_iterator_init(
+ bt_self_message_iterator *it,
struct bt_private_port *port)
{
- enum bt_notification_iterator_status ret =
- BT_NOTIFICATION_ITERATOR_STATUS_OK;
+ enum bt_message_iterator_status ret =
+ BT_MESSAGE_ITERATOR_STATUS_OK;
struct lttng_live_stream_iterator_generic *s;
- assert(it);
+ BT_ASSERT(it);
s = bt_private_port_get_user_data(port);
- assert(s);
+ BT_ASSERT(s);
switch (s->type) {
case LIVE_STREAM_TYPE_NO_STREAM:
{
struct lttng_live_no_stream_iterator *no_stream_iter =
container_of(s, struct lttng_live_no_stream_iterator, p);
- ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
+ ret = bt_self_message_iterator_set_user_data(it, no_stream_iter);
if (ret) {
goto error;
}
{
struct lttng_live_stream_iterator *stream_iter =
container_of(s, struct lttng_live_stream_iterator, p);
- ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
+ ret = bt_self_message_iterator_set_user_data(it, stream_iter);
if (ret) {
goto error;
}
break;
}
default:
- ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+ ret = BT_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
end:
return ret;
error:
- if (bt_private_notification_iterator_set_user_data(it, NULL)
- != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+ if (bt_self_message_iterator_set_user_data(it, NULL)
+ != BT_MESSAGE_ITERATOR_STATUS_OK) {
BT_LOGE("Error setting private data to NULL");
}
goto end;
}
static
-struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
- struct bt_value *params)
+bt_component_class_query_method_return lttng_live_query_list_sessions(
+ const bt_component_class *comp_class,
+ const bt_query_executor *query_exec,
+ bt_value *params)
{
- struct bt_value *url_value = NULL;
- struct bt_value *results = NULL;
+ bt_component_class_query_method_return query_ret = {
+ .result = NULL,
+ .status = BT_QUERY_STATUS_OK,
+ };
+
+ bt_value *url_value = NULL;
const char *url;
struct bt_live_viewer_connection *viewer_connection = NULL;
url_value = bt_value_map_get(params, "url");
if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
BT_LOGW("Mandatory \"url\" parameter missing");
+ query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
goto error;
}
if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
BT_LOGW("\"url\" parameter is required to be a string value");
+ query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
goto error;
}
goto error;
}
- results = bt_live_viewer_connection_list_sessions(viewer_connection);
+ query_ret.result =
+ bt_live_viewer_connection_list_sessions(viewer_connection);
+ if (!query_ret.result) {
+ goto error;
+ }
+
goto end;
+
error:
- BT_PUT(results);
+ BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
+
+ if (query_ret.status >= 0) {
+ query_ret.status = BT_QUERY_STATUS_ERROR;
+ }
+
end:
if (viewer_connection) {
bt_live_viewer_connection_destroy(viewer_connection);
}
- BT_PUT(url_value);
- return results;
+ BT_VALUE_PUT_REF_AND_RESET(url_value);
+ return query_ret;
}
BT_HIDDEN
-struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
- const char *object, struct bt_value *params)
+bt_component_class_query_method_return lttng_live_query(
+ const bt_component_class *comp_class,
+ const bt_query_executor *query_exec,
+ const char *object, bt_value *params)
{
+ bt_component_class_query_method_return ret = {
+ .result = NULL,
+ .status = BT_QUERY_STATUS_OK,
+ };
+
if (strcmp(object, "sessions") == 0) {
return lttng_live_query_list_sessions(comp_class,
- params);
+ query_exec, params);
}
BT_LOGW("Unknown query object `%s`", object);
- return NULL;
+ ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
+ return ret;
}
static
bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) {
lttng_live_destroy_session(session);
}
- BT_PUT(lttng_live->viewer_connection);
+ BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
if (lttng_live->url) {
g_string_free(lttng_live->url, TRUE);
}
if (lttng_live->no_stream_port) {
+ bt_object_get_ref(lttng_live->no_stream_port);
ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
- assert(!ret);
- BT_PUT(lttng_live->no_stream_port);
+ bt_object_put_ref(lttng_live->no_stream_port);
+ BT_ASSERT(!ret);
}
if (lttng_live->no_stream_iter) {
g_free(lttng_live->no_stream_iter);
}
BT_HIDDEN
-void lttng_live_component_finalize(struct bt_private_component *component)
+void lttng_live_component_finalize(bt_self_component *component)
{
- void *data = bt_private_component_get_user_data(component);
+ void *data = bt_self_component_get_user_data(component);
if (!data) {
return;
}
static
-struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
- struct bt_private_component *private_component,
- struct bt_graph *graph)
+struct lttng_live_component *lttng_live_component_create(bt_value *params,
+ bt_self_component *private_component)
{
struct lttng_live_component *lttng_live;
- struct bt_value *value = NULL;
+ bt_value *value = NULL;
const char *url;
enum bt_value_status ret;
BT_LOGW("Mandatory \"url\" parameter missing");
goto error;
}
- ret = bt_value_string_get(value, &url);
- if (ret != BT_VALUE_STATUS_OK) {
- BT_LOGW("\"url\" parameter is required to be a string value");
- goto error;
- }
+ url = bt_value_string_get(value);
lttng_live->url = g_string_new(url);
if (!lttng_live->url) {
goto error;
}
+ BT_VALUE_PUT_REF_AND_RESET(value);
lttng_live->viewer_connection =
bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
if (!lttng_live->viewer_connection) {
goto error;
}
lttng_live->private_component = private_component;
- lttng_live->graph = graph;
goto end;
BT_HIDDEN
enum bt_component_status lttng_live_component_init(
- struct bt_private_component *private_component,
- struct bt_value *params, void *init_method_data)
+ bt_self_component *private_component,
+ bt_value *params, void *init_method_data)
{
struct lttng_live_component *lttng_live;
enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
- struct bt_component *component;
- struct bt_graph *graph;
-
- component = bt_component_from_private_component(private_component);
- graph = bt_component_get_graph(component);
- bt_put(graph); /* weak */
- bt_put(component);
/* Passes ownership of iter ref to lttng_live_component_create. */
- lttng_live = lttng_live_component_create(params, private_component,
- graph);
+ lttng_live = lttng_live_component_create(params, private_component);
if (!lttng_live) {
- if (bt_graph_is_canceled(graph)) {
- ret = BT_COMPONENT_STATUS_AGAIN;
- } else {
- ret = BT_COMPONENT_STATUS_NOMEM;
- }
+ //TODO : we need access to the application cancel state
+ //because we are not part of a graph yet.
+ ret = BT_COMPONENT_STATUS_NOMEM;
goto end;
}
lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
lttng_live->no_stream_iter->lttng_live = lttng_live;
-
- lttng_live->no_stream_port =
- bt_private_component_source_add_output_private_port(
+ if (lttng_live_is_canceled(lttng_live)) {
+ goto end;
+ }
+ ret = bt_self_component_source_add_output_port(
lttng_live->private_component, "no-stream",
- lttng_live->no_stream_iter);
+ lttng_live->no_stream_iter,
+ <tng_live->no_stream_port);
+ if (ret != BT_COMPONENT_STATUS_OK) {
+ goto end;
+ }
+ bt_object_put_ref(lttng_live->no_stream_port); /* weak */
lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
- ret = bt_private_component_set_user_data(private_component, lttng_live);
+ ret = bt_self_component_set_user_data(private_component, lttng_live);
if (ret != BT_COMPONENT_STATUS_OK) {
goto error;
}
end:
return ret;
error:
- (void) bt_private_component_set_user_data(private_component, NULL);
+ (void) bt_self_component_set_user_data(private_component, NULL);
lttng_live_component_destroy_data(lttng_live);
return ret;
}
BT_HIDDEN
enum bt_component_status lttng_live_accept_port_connection(
- struct bt_private_component *private_component,
+ bt_self_component *private_component,
struct bt_private_port *self_private_port,
- struct bt_port *other_port)
+ const bt_port *other_port)
{
struct lttng_live_component *lttng_live =
- bt_private_component_get_user_data(private_component);
- struct bt_component *other_component;
+ bt_self_component_get_user_data(private_component);
+ bt_component *other_component;
enum bt_component_status status = BT_COMPONENT_STATUS_OK;
- struct bt_port *self_port = bt_port_from_private_port(self_private_port);
+ const bt_port *self_port = bt_port_from_private(self_private_port);
other_component = bt_port_get_component(other_port);
- bt_put(other_component); /* weak */
+ bt_component_put_ref(other_component); /* weak */
if (!lttng_live->downstream_component) {
lttng_live->downstream_component = other_component;
goto end;
}
end:
- bt_put(self_port);
+ bt_port_put_ref(self_port);
return status;
}
-
-static
-void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
-{
- enum bt_logging_level log_level = BT_LOG_NONE;
- const char *log_level_env = getenv(BT_LOGLEVEL_NAME);
-
- if (!log_level_env) {
- return;
- }
-
- if (strcmp(log_level_env, "VERBOSE") == 0) {
- log_level = BT_LOGGING_LEVEL_VERBOSE;
- } else if (strcmp(log_level_env, "DEBUG") == 0) {
- log_level = BT_LOGGING_LEVEL_DEBUG;
- } else if (strcmp(log_level_env, "INFO") == 0) {
- log_level = BT_LOGGING_LEVEL_INFO;
- } else if (strcmp(log_level_env, "WARN") == 0) {
- log_level = BT_LOGGING_LEVEL_WARN;
- } else if (strcmp(log_level_env, "ERROR") == 0) {
- log_level = BT_LOGGING_LEVEL_ERROR;
- } else if (strcmp(log_level_env, "FATAL") == 0) {
- log_level = BT_LOGGING_LEVEL_FATAL;
- } else {
- bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL;
- BT_LOGF("Incorrect log level specified in %s",
- BT_LOGLEVEL_NAME);
- abort();
- }
-
- bt_lttng_live_log_level = log_level;
-}