struct ctf_trace_class *ctf_tc;
struct lttng_live_stream_iterator *stream_iter =
g_ptr_array_index(trace->stream_iterators,
- stream_iter_idx);
+ stream_iter_idx);
if (stream_iter->msg_iter) {
continue;
}
ctf_tc = ctf_metadata_decoder_borrow_ctf_trace_class(
- trace->metadata->decoder);
+ trace->metadata->decoder);
stream_iter->msg_iter = bt_msg_iter_create(ctf_tc,
- lttng_live->max_query_size, medops,
- stream_iter, log_level, self_comp);
+ lttng_live->max_query_size, medops, stream_iter,
+ log_level, self_comp);
if (!stream_iter->msg_iter) {
goto error;
}
if (trace->trace) {
struct ctf_trace_class *ctf_tc =
ctf_metadata_decoder_borrow_ctf_trace_class(
- trace->metadata->decoder);
+ trace->metadata->decoder);
BT_ASSERT(!stream_iter->msg_iter);
stream_iter->msg_iter = bt_msg_iter_create(ctf_tc,
- lttng_live->max_query_size, medops,
- stream_iter, log_level, self_comp);
+ lttng_live->max_query_size, medops, stream_iter,
+ log_level, self_comp);
if (!stream_iter->msg_iter) {
goto error;
}
}
g_string_printf(stream_iter->name, STREAM_NAME_PREFIX "%" PRIu64,
- stream_iter->viewer_stream_id);
+ stream_iter->viewer_stream_id);
g_ptr_array_add(trace->stream_iterators, stream_iter);
/* Track the number of active stream iterator. */
trace->trace_class = NULL;
trace->trace = NULL;
trace->stream_iterators = g_ptr_array_new_with_free_func(
- (GDestroyNotify) lttng_live_stream_iterator_destroy);
+ (GDestroyNotify) lttng_live_stream_iterator_destroy);
BT_ASSERT(trace->stream_iterators);
trace->new_metadata_needed = true;
g_ptr_array_add(session->traces, trace);
session->self_comp = lttng_live_msg_iter->self_comp;
session->id = session_id;
session->traces = g_ptr_array_new_with_free_func(
- (GDestroyNotify) lttng_live_destroy_trace);
+ (GDestroyNotify) lttng_live_destroy_trace);
BT_ASSERT(session->traces);
session->lttng_live_msg_iter = lttng_live_msg_iter;
session->new_streams_needed = true;
{
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
- enum lttng_live_iterator_status ret =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
- struct packet_index index;
+ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
enum lttng_live_stream_state orig_state = lttng_live_stream->state;
+ struct packet_index index;
if (lttng_live_stream->trace->new_metadata_needed) {
ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
lttng_live_iterator_handle_new_streams_and_metadata(
struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- enum lttng_live_iterator_status ret =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
uint64_t session_idx = 0, nr_sessions_opened = 0;
struct lttng_live_session *session;
enum session_not_found_action sess_not_found_act =
struct lttng_live_stream_iterator *stream_iter,
bt_message **message, uint64_t timestamp)
{
- enum lttng_live_iterator_status ret =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
bt_message *msg = NULL;
BT_ASSERT(stream_iter->trace->clock_class);
msg = bt_message_message_iterator_inactivity_create(
- lttng_live_msg_iter->self_msg_iter,
- stream_iter->trace->clock_class,
- timestamp);
+ lttng_live_msg_iter->self_msg_iter,
+ stream_iter->trace->clock_class, timestamp);
if (!msg) {
goto error;
}
struct lttng_live_stream_iterator *lttng_live_stream,
bt_message **message)
{
- enum lttng_live_iterator_status ret =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
- message, lttng_live_stream->current_inactivity_ts);
+ message, lttng_live_stream->current_inactivity_ts);
lttng_live_stream->last_inactivity_ts =
- lttng_live_stream->current_inactivity_ts;
+ lttng_live_stream->current_inactivity_ts;
end:
return ret;
}
switch (bt_message_get_type(msg)) {
case BT_MESSAGE_TYPE_EVENT:
- clock_class =
- bt_message_event_borrow_stream_class_default_clock_class_const(
+ clock_class = bt_message_event_borrow_stream_class_default_clock_class_const(
msg);
BT_ASSERT(clock_class);
msg);
break;
case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- clock_class =
- bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+ clock_class = bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
msg);
BT_ASSERT(clock_class);
msg);
break;
case BT_MESSAGE_TYPE_PACKET_END:
- clock_class =
- bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+ clock_class = bt_message_packet_end_borrow_stream_class_default_clock_class_const(
msg);
BT_ASSERT(clock_class);
msg);
break;
case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- clock_class =
- bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+ clock_class = bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
msg);
BT_ASSERT(clock_class);
msg);
break;
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_class =
- bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+ clock_class = bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
msg);
BT_ASSERT(clock_class);
msg);
break;
case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
- clock_snapshot =
- bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
- msg);
+ clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg);
break;
default:
/* All the other messages have a higher priority */
bt_message **message)
{
enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
- enum bt_msg_iter_status status;
- uint64_t session_idx, trace_idx;
bt_logging_level log_level = lttng_live_msg_iter->log_level;
bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
+ enum bt_msg_iter_status status;
+ uint64_t session_idx, trace_idx;
for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
session_idx++) {
* `bt_msg_iter` should simply realize that it needs to close the
* stream properly by emitting the necessary stream end message.
*/
- enum bt_msg_iter_status status =
- bt_msg_iter_get_next_message(stream_iter->msg_iter,
- lttng_live_msg_iter->self_msg_iter, curr_msg);
+ enum bt_msg_iter_status status = bt_msg_iter_get_next_message(
+ stream_iter->msg_iter, lttng_live_msg_iter->self_msg_iter,
+ curr_msg);
if (status == BT_MSG_ITER_STATUS_ERROR) {
live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
retry:
print_stream_state(stream_iter);
live_status = lttng_live_iterator_handle_new_streams_and_metadata(
- lttng_live_msg_iter);
+ lttng_live_msg_iter);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_no_data_stream(
- lttng_live_msg_iter, stream_iter);
+ lttng_live_msg_iter, stream_iter);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
goto end;
}
live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
- lttng_live_msg_iter, stream_iter, curr_msg);
+ lttng_live_msg_iter, stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_ASSERT(!*curr_msg);
goto end;
goto end;
}
live_status = lttng_live_iterator_next_handle_one_active_data_stream(
- lttng_live_msg_iter, stream_iter, curr_msg);
+ lttng_live_msg_iter, stream_iter, curr_msg);
if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
BT_ASSERT(!*curr_msg);
}
struct lttng_live_stream_iterator **youngest_trace_stream_iter)
{
struct lttng_live_stream_iterator *youngest_candidate_stream_iter = NULL;
+ bt_logging_level log_level = lttng_live_msg_iter->log_level;
+ bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
enum lttng_live_iterator_status stream_iter_status;;
int64_t youngest_candidate_msg_ts = INT64_MAX;
uint64_t stream_iter_idx;
- bt_logging_level log_level = lttng_live_msg_iter->log_level;
- bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
BT_ASSERT(live_trace);
BT_ASSERT(live_trace->stream_iterators);
bool stream_iter_is_ended = false;
struct lttng_live_stream_iterator *stream_iter =
g_ptr_array_index(live_trace->stream_iterators,
- stream_iter_idx);
+ stream_iter_idx);
/*
* Since we may remove elements from the GPtrArray as we
bt_message *msg = NULL;
int64_t curr_msg_ts_ns = INT64_MAX;
stream_iter_status = lttng_live_iterator_next_msg_on_stream(
- lttng_live_msg_iter, stream_iter, &msg);
+ lttng_live_msg_iter, stream_iter, &msg);
BT_COMP_LOGD("live stream iterator returned status :%s",
- print_live_iterator_status(stream_iter_status));
+ print_live_iterator_status(stream_iter_status));
if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
stream_iter_is_ended = true;
break;
void lttng_live_component_finalize(bt_self_component_source *component)
{
void *data = bt_self_component_get_data(
- bt_self_component_source_as_self_component(component));
+ bt_self_component_source_as_self_component(component));
if (!data) {
return;
goto end;
}
- lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
- error_buf, sizeof(error_buf));
+ lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
+ sizeof(error_buf));
if (!lttng_live_url_parts.proto) {
BT_COMP_LOGW("Invalid LTTng live URL format: %s", error_buf);
goto end;
}
- viewer_connection->relay_hostname =
- lttng_live_url_parts.hostname;
+ viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
lttng_live_url_parts.hostname = NULL;
if (lttng_live_url_parts.port >= 0) {
viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
}
- viewer_connection->target_hostname =
- lttng_live_url_parts.target_hostname;
+ viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
lttng_live_url_parts.target_hostname = NULL;
if (lttng_live_url_parts.session_name) {
- viewer_connection->session_name =
- lttng_live_url_parts.session_name;
+ viewer_connection->session_name = lttng_live_url_parts.session_name;
lttng_live_url_parts.session_name = NULL;
}
BT_COMP_LOGI("Connecting to hostname : %s, port : %d, "
- "target hostname : %s, session name : %s, "
- "proto : %s",
- viewer_connection->relay_hostname->str,
- viewer_connection->port,
- !viewer_connection->target_hostname ?
- "<none>" : viewer_connection->target_hostname->str,
- !viewer_connection->session_name ?
- "<none>" : viewer_connection->session_name->str,
- lttng_live_url_parts.proto->str);
+ "target hostname : %s, session name : %s, proto : %s",
+ viewer_connection->relay_hostname->str,
+ viewer_connection->port,
+ !viewer_connection->target_hostname ?
+ "<none>" : viewer_connection->target_hostname->str,
+ !viewer_connection->session_name ?
+ "<none>" : viewer_connection->session_name->str,
+ lttng_live_url_parts.proto->str);
ret = 0;
end:
const struct lttng_viewer_session *session,
bool *_found, struct live_viewer_connection *viewer_connection)
{
- int ret = 0;
+ int i, len, ret = 0;
bt_value *map = NULL;
bt_value *hostname = NULL;
bt_value *session_name = NULL;
bt_value *btval = NULL;
- int i, len;
bool found = false;
len = bt_value_array_get_length(results);
ssize_t ret_len;
uint32_t i;
struct lttng_live_msg_iter *lttng_live_msg_iter =
- session->lttng_live_msg_iter;
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count);
for (i = 0; i < stream_count; i++) {
if (stream.metadata_flag) {
BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
- stream_id, stream.path_name,
- stream.channel_name);
+ stream_id, stream.path_name, stream.channel_name);
if (lttng_live_metadata_create_stream(session,
ctf_trace_id, stream_id,
stream.path_name)) {
session->lazy_stream_msg_init = true;
} else {
BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
- stream_id, stream.path_name,
- stream.channel_name);
+ stream_id, stream.path_name, stream.channel_name);
live_stream = lttng_live_stream_iterator_create(session,
ctf_trace_id, stream_id);
if (!live_stream) {
struct lttng_viewer_cmd cmd;
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
- ssize_t ret_len;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ ssize_t ret_len;
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
struct lttng_viewer_detach_session_request rq;
struct lttng_viewer_detach_session_response rp;
ssize_t ret_len;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
char *data = NULL;
ssize_t ret_len;
struct lttng_live_session *session = trace->session;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_next_index rq;
- ssize_t ret_len;
struct lttng_viewer_index rp;
- uint32_t flags, status;
- enum lttng_live_iterator_status retstatus =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ uint32_t flags, status;
+ ssize_t ret_len;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
uint64_t offset, uint64_t req_len, uint64_t *recv_len)
{
enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
+ struct lttng_viewer_trace_packet rp;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
- struct lttng_viewer_trace_packet rp;
- ssize_t ret_len;
- uint32_t flags, status;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ uint32_t flags, status;
+ ssize_t ret_len;
BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
- ssize_t ret_len;
struct lttng_live_msg_iter *lttng_live_msg_iter =
session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ ssize_t ret_len;
if (!session->new_streams_needed) {
return LTTNG_LIVE_ITERATOR_STATUS_OK;