/*
+ * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
* Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
#include <babeltrace/common-internal.h>
#include <babeltrace/babeltrace.h>
-#include "lttng-live-internal.h"
+#include "lttng-live.h"
#include "viewer-connection.h"
#include "lttng-viewer-abi.h"
#include "data-stream.h"
#include "metadata.h"
-static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
void *buf, size_t len)
{
ssize_t ret;
size_t copied = 0, to_copy = len;
- struct lttng_live_component *lttng_live =
- viewer_connection->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ viewer_connection->lttng_live_msg_iter;
BT_SOCKET sock = viewer_connection->control_sock;
do {
to_copy -= ret;
}
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
- if (lttng_live_is_canceled(lttng_live)) {
+ if (!viewer_connection->in_query &&
+ lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
break;
} else {
continue;
return ret;
}
-static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+static
+ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
const void *buf, size_t len)
{
- struct lttng_live_component *lttng_live =
- viewer_connection->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ viewer_connection->lttng_live_msg_iter;
BT_SOCKET sock = viewer_connection->control_sock;
ssize_t ret;
for (;;) {
ret = bt_socket_send_nosigpipe(sock, buf, len);
if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
- if (lttng_live_is_canceled(lttng_live)) {
+ if (!viewer_connection->in_query &&
+ lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
break;
} else {
continue;
return ret;
}
-static int parse_url(struct bt_live_viewer_connection *viewer_connection)
+static
+int parse_url(struct live_viewer_connection *viewer_connection)
{
char error_buf[256] = { 0 };
struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
return ret;
}
-static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
return -1;
}
-static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
{
struct hostent *host;
struct sockaddr_in server_addr;
return -1;
}
-static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
+static
+void lttng_live_disconnect_viewer(
+ struct live_viewer_connection *viewer_connection)
{
if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
return;
}
}
-static void connection_release(struct bt_object *obj)
+static
+void connection_release(bt_object *obj)
{
- struct bt_live_viewer_connection *conn =
- container_of(obj, struct bt_live_viewer_connection, obj);
+ struct live_viewer_connection *conn =
+ container_of(obj, struct live_viewer_connection, obj);
- bt_live_viewer_connection_destroy(conn);
+ live_viewer_connection_destroy(conn);
}
static
-enum bt_value_status list_update_session(struct bt_value *results,
+int list_update_session(bt_value *results,
const struct lttng_viewer_session *session,
bool *_found)
{
- enum bt_value_status ret = BT_VALUE_STATUS_OK;
- struct bt_value *map = NULL;
- struct bt_value *hostname = NULL;
- struct bt_value *session_name = NULL;
- struct bt_value *btval = NULL;
+ int ret = 0;
+ bt_value *map = NULL;
+ bt_value *hostname = NULL;
+ bt_value *session_name = NULL;
+ bt_value *btval = NULL;
int i, len;
bool found = false;
- len = bt_value_array_size(results);
+ len = bt_value_array_get_size(results);
if (len < 0) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error getting size of array.");
+ ret = -1;
goto end;
}
for (i = 0; i < len; i++) {
const char *hostname_str = NULL;
const char *session_name_str = NULL;
- map = bt_value_array_get(results, (size_t) i);
+ map = bt_value_array_borrow_element_by_index(results, (size_t) i);
if (!map) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing map.");
+ ret = -1;
goto end;
}
- hostname = bt_value_map_get(map, "target-hostname");
+ hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
if (!hostname) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error borrowing \"target-hostname\" entry.");
+ ret = -1;
goto end;
}
- session_name = bt_value_map_get(map, "session-name");
+ session_name = bt_value_map_borrow_entry_value(map, "session-name");
if (!session_name) {
- ret = BT_VALUE_STATUS_ERROR;
- goto end;
- }
- ret = bt_value_string_get(hostname, &hostname_str);
- if (ret != BT_VALUE_STATUS_OK) {
- goto end;
- }
- ret = bt_value_string_get(session_name, &session_name_str);
- if (ret != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error borrowing \"session-name\" entry.");
+ ret = -1;
goto end;
}
+ hostname_str = bt_value_string_get(hostname);
+ session_name_str = bt_value_string_get(session_name);
if (!strcmp(session->hostname, hostname_str)
&& !strcmp(session->session_name,
found = true;
- btval = bt_value_map_get(map, "stream-count");
+ btval = bt_value_map_borrow_entry_value(map, "stream-count");
if (!btval) {
- ret = BT_VALUE_STATUS_ERROR;
- goto end;
- }
- ret = bt_value_integer_get(btval, &val);
- if (ret != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error borrowing \"stream-count\" entry.");
+ ret = -1;
goto end;
}
+ val = bt_value_signed_integer_get(btval);
/* sum */
val += streams;
- ret = bt_value_integer_set(btval, val);
- if (ret != BT_VALUE_STATUS_OK) {
- goto end;
- }
- BT_PUT(btval);
+ bt_value_signed_integer_set(btval, val);
- btval = bt_value_map_get(map, "client-count");
+ btval = bt_value_map_borrow_entry_value(map, "client-count");
if (!btval) {
- ret = BT_VALUE_STATUS_ERROR;
- goto end;
- }
- ret = bt_value_integer_get(btval, &val);
- if (ret != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error borrowing \"client-count\" entry.");
+ ret = -1;
goto end;
}
+ val = bt_value_signed_integer_get(btval);
/* max */
val = max_t(int64_t, clients, val);
- ret = bt_value_integer_set(btval, val);
- if (ret != BT_VALUE_STATUS_OK) {
- goto end;
- }
- BT_PUT(btval);
+ bt_value_signed_integer_set(btval, val);
}
- BT_PUT(hostname);
- BT_PUT(session_name);
- BT_PUT(map);
-
if (found) {
break;
}
}
end:
- BT_PUT(btval);
- BT_PUT(hostname);
- BT_PUT(session_name);
- BT_PUT(map);
*_found = found;
return ret;
}
static
-enum bt_value_status list_append_session(struct bt_value *results,
+int list_append_session(bt_value *results,
GString *base_url,
const struct lttng_viewer_session *session)
{
- enum bt_value_status ret = BT_VALUE_STATUS_OK;
- struct bt_value *map = NULL;
+ int ret = 0;
+ bt_value_status ret_status;
+ bt_value *map = NULL;
GString *url = NULL;
bool found = false;
* and do max of client counts.
*/
ret = list_update_session(results, session, &found);
- if (ret != BT_VALUE_STATUS_OK || found) {
+ if (ret || found) {
goto end;
}
map = bt_value_map_create();
if (!map) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error creating map value.");
+ ret = -1;
goto end;
}
if (base_url->len < 1) {
- ret = BT_VALUE_STATUS_ERROR;
+ BT_LOGE_STR("Error: base_url length smaller than 1.");
+ ret = -1;
goto end;
}
/*
g_string_append_c(url, '/');
g_string_append(url, session->session_name);
- ret = bt_value_map_insert_string(map, "url", url->str);
- if (ret != BT_VALUE_STATUS_OK) {
+ ret_status = bt_value_map_insert_string_entry(map, "url", url->str);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"url\" entry.");
+ ret = -1;
goto end;
}
* key = "target-hostname",
* value = <string>,
*/
- ret = bt_value_map_insert_string(map, "target-hostname",
+ ret_status = bt_value_map_insert_string_entry(map, "target-hostname",
session->hostname);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"target-hostname\" entry.");
+ ret = -1;
goto end;
}
* key = "session-name",
* value = <string>,
*/
- ret = bt_value_map_insert_string(map, "session-name",
+ ret_status = bt_value_map_insert_string_entry(map, "session-name",
session->session_name);
- if (ret != BT_VALUE_STATUS_OK) {
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"session-name\" entry.");
+ ret = -1;
goto end;
}
{
uint32_t live_timer = be32toh(session->live_timer);
- ret = bt_value_map_insert_integer(map, "timer-us",
- live_timer);
- if (ret != BT_VALUE_STATUS_OK) {
+ ret_status = bt_value_map_insert_signed_integer_entry(
+ map, "timer-us", live_timer);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"timer-us\" entry.");
+ ret = -1;
goto end;
}
}
{
uint32_t streams = be32toh(session->streams);
- ret = bt_value_map_insert_integer(map, "stream-count",
- streams);
- if (ret != BT_VALUE_STATUS_OK) {
+ ret_status = bt_value_map_insert_signed_integer_entry(map,
+ "stream-count", streams);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"stream-count\" entry.");
+ ret = -1;
goto end;
}
}
-
/*
* key = "client-count",
* value = <integer>,
{
uint32_t clients = be32toh(session->clients);
- ret = bt_value_map_insert_integer(map, "client-count",
- clients);
- if (ret != BT_VALUE_STATUS_OK) {
+ ret_status = bt_value_map_insert_signed_integer_entry(map,
+ "client-count", clients);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error inserting \"client-count\" entry.");
+ ret = -1;
goto end;
}
}
- ret = bt_value_array_append(results, map);
+ ret_status = bt_value_array_append_element(results, map);
+ if (ret_status != BT_VALUE_STATUS_OK) {
+ BT_LOGE_STR("Error appending map to results.");
+ ret = -1;
+ }
+
end:
if (url) {
- g_string_free(url, TRUE);
+ g_string_free(url, true);
}
- BT_PUT(map);
+ BT_VALUE_PUT_REF_AND_RESET(map);
return ret;
}
*/
BT_HIDDEN
-struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
+bt_query_status live_viewer_connection_list_sessions(
+ struct live_viewer_connection *viewer_connection,
+ const bt_value **user_result)
{
- struct bt_value *results = NULL;
+ bt_query_status status = BT_QUERY_STATUS_OK;
+ bt_value *result = NULL;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
uint32_t i, sessions_count;
goto error;
}
- results = bt_value_array_create();
- if (!results) {
+ result = bt_value_array_create();
+ if (!result) {
BT_LOGE("Error creating array");
+ status = BT_QUERY_STATUS_NOMEM;
goto error;
}
ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(cmd));
ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving session list: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(list));
for (i = 0; i < sessions_count; i++) {
struct lttng_viewer_session lsession;
- ret_len = lttng_live_recv(viewer_connection,
- &lsession, sizeof(lsession));
+ ret_len = lttng_live_recv(viewer_connection, &lsession,
+ sizeof(lsession));
if (ret_len == 0) {
BT_LOGI("Remote side has closed connection");
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error receiving session: %s", bt_socket_errormsg());
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
BT_ASSERT(ret_len == sizeof(lsession));
lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
- if (list_append_session(results,
- viewer_connection->url, &lsession)
- != BT_VALUE_STATUS_OK) {
+ if (list_append_session(result, viewer_connection->url,
+ &lsession)) {
+ status = BT_QUERY_STATUS_ERROR;
goto error;
}
}
+
+ *user_result = result;
goto end;
error:
- BT_PUT(results);
+ BT_VALUE_PUT_REF_AND_RESET(result);
end:
- return results;
+ return status;
}
static
-int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
+int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_list_sessions list;
uint32_t i, sessions_count;
ssize_t ret_len;
uint64_t session_id;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
cmd.data_size = htobe64((uint64_t) 0);
LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
viewer_connection->target_hostname->str,
LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
- if (lttng_live_add_session(lttng_live, session_id,
+ if (lttng_live_add_session(lttng_live_msg_iter, session_id,
lsession.hostname,
lsession.session_name)) {
goto error;
}
BT_HIDDEN
-int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
+int lttng_live_create_viewer_session(
+ struct lttng_live_msg_iter *lttng_live_msg_iter)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_create_session_response resp;
ssize_t ret_len;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
cmd.data_size = htobe64((uint64_t) 0);
BT_LOGE("Error creating viewer session");
goto error;
}
- if (lttng_live_query_session_ids(lttng_live)) {
+ if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
goto error;
}
{
ssize_t ret_len;
uint32_t i;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
for (i = 0; i < stream_count; i++) {
goto error;
}
- session->lazy_stream_notif_init = true;
+ session->lazy_stream_msg_init = true;
} else {
BT_LOGD(" stream %" PRIu64 " : %s/%s",
stream_id, stream.path_name,
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
- if (session->attached) {
- return 0;
- }
-
cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
struct lttng_viewer_detach_session_request rq;
struct lttng_viewer_detach_session_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
uint64_t session_id = session->id;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
char *data = NULL;
ssize_t ret_len;
struct lttng_live_session *session = trace->session;
- struct lttng_live_component *lttng_live = session->lttng_live;
+ struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
struct lttng_live_metadata *metadata = trace->metadata;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
}
BT_HIDDEN
-enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
+enum lttng_live_iterator_status lttng_live_get_next_index(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
struct lttng_live_stream_iterator *stream,
struct packet_index *index)
{
ssize_t ret_len;
struct lttng_viewer_index rp;
uint32_t flags, status;
- enum bt_lttng_live_iterator_status retstatus =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ enum lttng_live_iterator_status retstatus =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
+ BT_LOGE("Error sending get_next_index request: %s",
+ bt_socket_errormsg());
goto error;
}
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg());
+ BT_LOGE("Error receiving get_next_index response: %s",
+ bt_socket_errormsg());
goto error;
}
BT_ASSERT(ret_len == sizeof(rp));
BT_LOGD("get_next_index: inactive");
memset(index, 0, sizeof(struct packet_index));
index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
- stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
+ stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
ctf_stream_class_id = be64toh(rp.stream_id);
if (stream->ctf_stream_class_id != -1ULL) {
BT_ASSERT(stream->ctf_stream_class_id ==
}
stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
- stream->current_packet_end_timestamp =
- index->ts_cycles.timestamp_end;
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
BT_LOGD("get_next_index: new metadata needed");
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
BT_LOGD("get_next_index: new streams needed");
- lttng_live_need_new_streams(lttng_live);
+ lttng_live_need_new_streams(lttng_live_msg_iter);
}
break;
}
case LTTNG_VIEWER_INDEX_RETRY:
BT_LOGD("get_next_index: retry");
memset(index, 0, sizeof(struct packet_index));
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
goto end;
case LTTNG_VIEWER_INDEX_HUP:
BT_LOGD("get_next_index: stream hung up");
memset(index, 0, sizeof(struct packet_index));
index->offset = EOF;
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
stream->state = LTTNG_LIVE_STREAM_EOF;
break;
case LTTNG_VIEWER_INDEX_ERR:
error:
if (lttng_live_is_canceled(lttng_live)) {
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return retstatus;
}
BT_HIDDEN
-enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
- struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
- uint64_t req_len, uint64_t *recv_len)
+enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
+ struct lttng_live_msg_iter *lttng_live_msg_iter,
+ struct lttng_live_stream_iterator *stream, uint8_t *buf,
+ uint64_t offset, uint64_t req_len, uint64_t *recv_len)
{
- enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK;
+ enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
struct lttng_viewer_trace_packet rp;
ssize_t ret_len;
uint32_t flags, status;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
case LTTNG_VIEWER_GET_PACKET_RETRY:
/* Unimplemented by relay daemon */
BT_LOGD("get_data_packet: retry");
- retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
goto end;
case LTTNG_VIEWER_GET_PACKET_ERR:
if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
}
if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
BT_LOGD("get_data_packet: new streams needed, try again later");
- lttng_live_need_new_streams(lttng_live);
+ lttng_live_need_new_streams(lttng_live_msg_iter);
}
if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
| LTTNG_VIEWER_FLAG_NEW_STREAM)) {
- retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
goto end;
}
BT_LOGE("get_data_packet: error");
goto error;
case LTTNG_VIEWER_GET_PACKET_EOF:
- retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF;
+ retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF;
goto end;
default:
BT_LOGE("get_data_packet: unknown");
error:
if (lttng_live_is_canceled(lttng_live)) {
- retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+ retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
} else {
- retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+ retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
}
return retstatus;
}
* Request new streams for a session.
*/
BT_HIDDEN
-enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
+enum lttng_live_iterator_status lttng_live_get_new_streams(
struct lttng_live_session *session)
{
- enum bt_lttng_live_iterator_status status =
- BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ enum lttng_live_iterator_status status =
+ LTTNG_LIVE_ITERATOR_STATUS_OK;
struct lttng_viewer_cmd cmd;
struct lttng_viewer_new_streams_request rq;
struct lttng_viewer_new_streams_response rp;
ssize_t ret_len;
- struct lttng_live_component *lttng_live = session->lttng_live;
- struct bt_live_viewer_connection *viewer_connection =
- lttng_live->viewer_connection;
+ struct lttng_live_msg_iter *lttng_live_msg_iter =
+ session->lttng_live_msg_iter;
+ struct live_viewer_connection *viewer_connection =
+ lttng_live_msg_iter->viewer_connection;
+ struct lttng_live_component *lttng_live =
+ lttng_live_msg_iter->lttng_live_comp;
uint32_t streams_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
if (!session->new_streams_needed) {
- return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
+ return LTTNG_LIVE_ITERATOR_STATUS_OK;
}
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
+ BT_LOGE("Error sending get_new_streams request: %s",
+ bt_socket_errormsg());
goto error;
}
case LTTNG_VIEWER_NEW_STREAMS_HUP:
session->new_streams_needed = false;
session->closed = true;
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
+ status = LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
case LTTNG_VIEWER_NEW_STREAMS_ERR:
BT_LOGE("get_new_streams error");
error:
if (lttng_live_is_canceled(lttng_live)) {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+ status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
} else {
- status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+ status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
}
return status;
}
BT_HIDDEN
-struct bt_live_viewer_connection *
- bt_live_viewer_connection_create(const char *url,
- struct lttng_live_component *lttng_live)
+struct live_viewer_connection *live_viewer_connection_create(
+ const char *url, bool in_query,
+ struct lttng_live_msg_iter *lttng_live_msg_iter)
{
- struct bt_live_viewer_connection *viewer_connection;
+ struct live_viewer_connection *viewer_connection;
- viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
+ viewer_connection = g_new0(struct live_viewer_connection, 1);
if (bt_socket_init() != 0) {
goto error;
}
- bt_object_init(&viewer_connection->obj, connection_release);
+ bt_object_init_shared(&viewer_connection->obj, connection_release);
viewer_connection->control_sock = BT_INVALID_SOCKET;
viewer_connection->port = -1;
- viewer_connection->lttng_live = lttng_live;
+ viewer_connection->in_query = in_query;
+ viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
viewer_connection->url = g_string_new(url);
if (!viewer_connection->url) {
goto error;
}
BT_HIDDEN
-void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
+void live_viewer_connection_destroy(
+ struct live_viewer_connection *viewer_connection)
{
BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
lttng_live_disconnect_viewer(viewer_connection);
- g_string_free(viewer_connection->url, TRUE);
+ g_string_free(viewer_connection->url, true);
if (viewer_connection->relay_hostname) {
- g_string_free(viewer_connection->relay_hostname, TRUE);
+ g_string_free(viewer_connection->relay_hostname, true);
}
if (viewer_connection->target_hostname) {
- g_string_free(viewer_connection->target_hostname, TRUE);
+ g_string_free(viewer_connection->target_hostname, true);
}
if (viewer_connection->session_name) {
- g_string_free(viewer_connection->session_name, TRUE);
+ g_string_free(viewer_connection->session_name, true);
}
g_free(viewer_connection);