#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
#define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
-#include "plugins/comp-logging.h"
+#include "logging/comp-logging.h"
#include <stdio.h>
#include <stdint.h>
}
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (!viewer_connection->in_query &&
- lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+ lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
break;
} else {
continue;
ret = bt_socket_send_nosigpipe(sock, buf, len);
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
if (!viewer_connection->in_query &&
- lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+ lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
break;
} else {
continue;
goto end;
}
- lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
- error_buf, sizeof(error_buf));
+ lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
+ sizeof(error_buf));
if (!lttng_live_url_parts.proto) {
BT_COMP_LOGW("Invalid LTTng live URL format: %s", error_buf);
goto end;
}
- viewer_connection->relay_hostname =
- lttng_live_url_parts.hostname;
+ viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
lttng_live_url_parts.hostname = NULL;
if (lttng_live_url_parts.port >= 0) {
viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
}
- viewer_connection->target_hostname =
- lttng_live_url_parts.target_hostname;
+ viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
lttng_live_url_parts.target_hostname = NULL;
if (lttng_live_url_parts.session_name) {
- viewer_connection->session_name =
- lttng_live_url_parts.session_name;
+ viewer_connection->session_name = lttng_live_url_parts.session_name;
lttng_live_url_parts.session_name = NULL;
}
BT_COMP_LOGI("Connecting to hostname : %s, port : %d, "
- "target hostname : %s, session name : %s, "
- "proto : %s",
- viewer_connection->relay_hostname->str,
- viewer_connection->port,
- viewer_connection->target_hostname == NULL ?
- "<none>" : viewer_connection->target_hostname->str,
- viewer_connection->session_name == NULL ?
- "<none>" : viewer_connection->session_name->str,
- lttng_live_url_parts.proto->str);
+ "target hostname : %s, session name : %s, proto : %s",
+ viewer_connection->relay_hostname->str,
+ viewer_connection->port,
+ !viewer_connection->target_hostname ?
+ "<none>" : viewer_connection->target_hostname->str,
+ !viewer_connection->session_name ?
+ "<none>" : viewer_connection->session_name->str,
+ lttng_live_url_parts.proto->str);
ret = 0;
end:
bool *_found, struct live_viewer_connection *viewer_connection)
{
int ret = 0;
+ uint64_t i, len;
bt_value *map = NULL;
bt_value *hostname = NULL;
bt_value *session_name = NULL;
bt_value *btval = NULL;
- int i, len;
bool found = false;
- len = bt_value_array_get_size(results);
- if (len < 0) {
- BT_COMP_LOGE_STR("Error getting size of array.");
- ret = -1;
- goto end;
- }
+ len = bt_value_array_get_length(results);
for (i = 0; i < len; i++) {
const char *hostname_str = NULL;
const char *session_name_str = NULL;
- map = bt_value_array_borrow_element_by_index(results, (size_t) i);
+ map = bt_value_array_borrow_element_by_index(results, i);
if (!map) {
BT_COMP_LOGE_STR("Error borrowing map.");
ret = -1;
ret = -1;
goto end;
}
- val = bt_value_integer_signed_get(btval);
+ val = bt_value_integer_unsigned_get(btval);
/* sum */
val += streams;
- bt_value_integer_signed_set(btval, val);
+ bt_value_integer_unsigned_set(btval, val);
btval = bt_value_map_borrow_entry_value(map, "client-count");
if (!btval) {
ret = -1;
goto end;
}
- val = bt_value_integer_signed_get(btval);
+ val = bt_value_integer_unsigned_get(btval);
/* max */
val = bt_max_t(int64_t, clients, val);
- bt_value_integer_signed_set(btval, val);
+ bt_value_integer_unsigned_set(btval, val);
}
if (found) {
{
uint32_t live_timer = be32toh(session->live_timer);
- insert_status = bt_value_map_insert_signed_integer_entry(
+ insert_status = bt_value_map_insert_unsigned_integer_entry(
map, "timer-us", live_timer);
if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
BT_COMP_LOGE_STR("Error inserting \"timer-us\" entry.");
{
uint32_t streams = be32toh(session->streams);
- insert_status = bt_value_map_insert_signed_integer_entry(map,
+ insert_status = bt_value_map_insert_unsigned_integer_entry(map,
"stream-count", streams);
if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
BT_COMP_LOGE_STR("Error inserting \"stream-count\" entry.");
{
uint32_t clients = be32toh(session->clients);
- insert_status = bt_value_map_insert_signed_integer_entry(map,
+ insert_status = bt_value_map_insert_unsigned_integer_entry(map,
"client-count", clients);
if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
BT_COMP_LOGE_STR("Error inserting \"client-count\" entry.");
uint32_t i, sessions_count;
ssize_t ret_len;
- if (lttng_live_handshake(viewer_connection)) {
- goto error;
- }
-
result = bt_value_array_create();
if (!result) {
BT_COMP_LOGE("Error creating array");
ssize_t ret_len;
uint32_t i;
struct lttng_live_msg_iter *lttng_live_msg_iter =
- session->lttng_live_msg_iter;
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count);
for (i = 0; i < stream_count; i++) {
if (stream.metadata_flag) {
BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
- stream_id, stream.path_name,
- stream.channel_name);
+ stream_id, stream.path_name, stream.channel_name);
if (lttng_live_metadata_create_stream(session,
ctf_trace_id, stream_id,
stream.path_name)) {
session->lazy_stream_msg_init = true;
} else {
BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
- stream_id, stream.path_name,
- stream.channel_name);
+ stream_id, stream.path_name, stream.channel_name);
live_stream = lttng_live_stream_iterator_create(session,
ctf_trace_id, stream_id);
if (!live_stream) {
struct lttng_viewer_cmd cmd;
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
- ssize_t ret_len;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ ssize_t ret_len;
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
struct lttng_viewer_detach_session_request rq;
struct lttng_viewer_detach_session_response rp;
ssize_t ret_len;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
char *data = NULL;
ssize_t ret_len;
struct lttng_live_session *session = trace->session;
- struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata;
struct live_viewer_connection *viewer_connection =
- lttng_live_msg_iter->viewer_connection;
+ lttng_live_msg_iter->viewer_connection;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_next_index rq;
- ssize_t ret_len;
struct lttng_viewer_index rp;
- uint32_t flags, status;
- enum lttng_live_iterator_status retstatus =
- LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
+ uint32_t flags, status;
+ ssize_t ret_len;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
index->offset = EOF;
retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
stream->state = LTTNG_LIVE_STREAM_EOF;
+ stream->has_stream_hung_up = true;
break;
case LTTNG_VIEWER_INDEX_ERR:
BT_COMP_LOGE("get_next_index: error");
return retstatus;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
uint64_t offset, uint64_t req_len, uint64_t *recv_len)
{
enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
+ struct lttng_viewer_trace_packet rp;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
- struct lttng_viewer_trace_packet rp;
- ssize_t ret_len;
- uint32_t flags, status;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
+ uint32_t flags, status;
+ ssize_t ret_len;
BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
return retstatus;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
} else {
retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
- ssize_t ret_len;
struct lttng_live_msg_iter *lttng_live_msg_iter =
session->lttng_live_msg_iter;
struct live_viewer_connection *viewer_connection =
lttng_live_msg_iter->viewer_connection;
- struct lttng_live_component *lttng_live =
- lttng_live_msg_iter->lttng_live_comp;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ ssize_t ret_len;
if (!session->new_streams_needed) {
return LTTNG_LIVE_ITERATOR_STATUS_OK;
return status;
error:
- if (lttng_live_graph_is_canceled(lttng_live)) {
+ if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
BT_HIDDEN
struct live_viewer_connection *live_viewer_connection_create(
const char *url, bool in_query,
- struct lttng_live_msg_iter *lttng_live_msg_iter)
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ bt_logging_level log_level)
{
struct live_viewer_connection *viewer_connection;
viewer_connection = g_new0(struct live_viewer_connection, 1);
- if (bt_socket_init(lttng_live_msg_iter->log_level) != 0) {
+ if (bt_socket_init(log_level) != 0) {
goto error;
}
- viewer_connection->log_level = lttng_live_msg_iter->log_level;
- viewer_connection->self_comp = lttng_live_msg_iter->self_comp;
+ viewer_connection->log_level = log_level;
+
+ if (lttng_live_msg_iter) {
+ viewer_connection->self_comp = lttng_live_msg_iter->self_comp;
+ }
+
bt_object_init_shared(&viewer_connection->obj, connection_release);
viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
error_report:
BT_COMP_LOGW("Failure to establish connection to url \"%s\"", url);
error:
- g_free(viewer_connection);
+ if (viewer_connection) {
+ live_viewer_connection_destroy(viewer_connection);
+ }
+
return NULL;
}
{
BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str);
lttng_live_disconnect_viewer(viewer_connection);
- g_string_free(viewer_connection->url, true);
+ if (viewer_connection->url) {
+ g_string_free(viewer_connection->url, true);
+ }
if (viewer_connection->relay_hostname) {
g_string_free(viewer_connection->relay_hostname, true);
}