X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=b37db12ec46e2f9a0f464658b71e7e63a0440692;hp=d7a4a0a106be293c5731bed49b59262770d33cfe;hb=bb18709be19ebc5b1bd9264cdbd3dd20939bdd05;hpb=1e3400b8572abdb55c6f0416e7c47d13fb715037 diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index d7a4a0a1..b37db12e 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -1,4 +1,5 @@ /* + * Copyright 2019 - Francis Deslauriers * Copyright 2016 - Mathieu Desnoyers * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -39,19 +40,20 @@ #include #include -#include "lttng-live-internal.h" +#include "lttng-live.h" #include "viewer-connection.h" #include "lttng-viewer-abi.h" #include "data-stream.h" #include "metadata.h" -static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection, +static +ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len) { ssize_t ret; size_t copied = 0, to_copy = len; - struct lttng_live_component *lttng_live = - viewer_connection->lttng_live; + struct lttng_live_msg_iter *lttng_live_msg_iter = + viewer_connection->lttng_live_msg_iter; BT_SOCKET sock = viewer_connection->control_sock; do { @@ -62,7 +64,8 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti to_copy -= ret; } if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (lttng_live_is_canceled(lttng_live)) { + if (!viewer_connection->in_query && + lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { break; } else { continue; @@ -75,18 +78,20 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti return ret; } -static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection, +static +ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len) { - struct lttng_live_component *lttng_live = - viewer_connection->lttng_live; + struct lttng_live_msg_iter *lttng_live_msg_iter = + viewer_connection->lttng_live_msg_iter; BT_SOCKET sock = viewer_connection->control_sock; ssize_t ret; for (;;) { ret = bt_socket_send_nosigpipe(sock, buf, len); if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (lttng_live_is_canceled(lttng_live)) { + if (!viewer_connection->in_query && + lttng_live_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { break; } else { continue; @@ -98,7 +103,8 @@ static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connecti return ret; } -static int parse_url(struct bt_live_viewer_connection *viewer_connection) +static +int parse_url(struct live_viewer_connection *viewer_connection) { char error_buf[256] = { 0 }; struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 }; @@ -153,7 +159,8 @@ end: return ret; } -static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection) +static +int lttng_live_handshake(struct live_viewer_connection *viewer_connection) { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; @@ -221,7 +228,8 @@ error: return -1; } -static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection) +static +int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; @@ -271,7 +279,9 @@ error: return -1; } -static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) +static +void lttng_live_disconnect_viewer( + struct live_viewer_connection *viewer_connection) { if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; @@ -282,20 +292,21 @@ static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewe } } -static void connection_release(bt_object *obj) +static +void connection_release(bt_object *obj) { - struct bt_live_viewer_connection *conn = - container_of(obj, struct bt_live_viewer_connection, obj); + struct live_viewer_connection *conn = + container_of(obj, struct live_viewer_connection, obj); - bt_live_viewer_connection_destroy(conn); + live_viewer_connection_destroy(conn); } static -bt_value_status list_update_session(bt_value *results, +int list_update_session(bt_value *results, const struct lttng_viewer_session *session, bool *_found) { - bt_value_status ret = BT_VALUE_STATUS_OK; + int ret = 0; bt_value *map = NULL; bt_value *hostname = NULL; bt_value *session_name = NULL; @@ -305,26 +316,30 @@ bt_value_status list_update_session(bt_value *results, len = bt_value_array_get_size(results); if (len < 0) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error getting size of array."); + ret = -1; goto end; } for (i = 0; i < len; i++) { const char *hostname_str = NULL; const char *session_name_str = NULL; - map = bt_value_array_get(results, (size_t) i); + map = bt_value_array_borrow_element_by_index(results, (size_t) i); if (!map) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing map."); + ret = -1; goto end; } - hostname = bt_value_map_get(map, "target-hostname"); + hostname = bt_value_map_borrow_entry_value(map, "target-hostname"); if (!hostname) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing \"target-hostname\" entry."); + ret = -1; goto end; } - session_name = bt_value_map_get(map, "session-name"); + session_name = bt_value_map_borrow_entry_value(map, "session-name"); if (!session_name) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing \"session-name\" entry."); + ret = -1; goto end; } hostname_str = bt_value_string_get(hostname); @@ -339,58 +354,45 @@ bt_value_status list_update_session(bt_value *results, found = true; - btval = bt_value_map_get(map, "stream-count"); + btval = bt_value_map_borrow_entry_value(map, "stream-count"); if (!btval) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing \"stream-count\" entry."); + ret = -1; goto end; } val = bt_value_integer_get(btval); /* sum */ val += streams; - ret = bt_private_integer_bool_set(btval, val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - BT_VALUE_PUT_REF_AND_RESET(btval); + bt_value_integer_set(btval, val); - btval = bt_value_map_get(map, "client-count"); + btval = bt_value_map_borrow_entry_value(map, "client-count"); if (!btval) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error borrowing \"client-count\" entry."); + ret = -1; goto end; } val = bt_value_integer_get(btval); /* max */ val = max_t(int64_t, clients, val); - ret = bt_private_integer_bool_set(btval, val); - if (ret != BT_VALUE_STATUS_OK) { - goto end; - } - BT_VALUE_PUT_REF_AND_RESET(btval); + bt_value_integer_set(btval, val); } - BT_VALUE_PUT_REF_AND_RESET(hostname); - BT_VALUE_PUT_REF_AND_RESET(session_name); - BT_VALUE_PUT_REF_AND_RESET(map); - if (found) { break; } } end: - BT_VALUE_PUT_REF_AND_RESET(btval); - BT_VALUE_PUT_REF_AND_RESET(hostname); - BT_VALUE_PUT_REF_AND_RESET(session_name); - BT_VALUE_PUT_REF_AND_RESET(map); *_found = found; return ret; } static -bt_value_status list_append_session(bt_value *results, +int list_append_session(bt_value *results, GString *base_url, const struct lttng_viewer_session *session) { - bt_value_status ret = BT_VALUE_STATUS_OK; + int ret = 0; + bt_value_status ret_status; bt_value *map = NULL; GString *url = NULL; bool found = false; @@ -400,18 +402,20 @@ bt_value_status list_append_session(bt_value *results, * and do max of client counts. */ ret = list_update_session(results, session, &found); - if (ret != BT_VALUE_STATUS_OK || found) { + if (ret || found) { goto end; } - map = bt_private_value_map_create(); + map = bt_value_map_create(); if (!map) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error creating map value."); + ret = -1; goto end; } if (base_url->len < 1) { - ret = BT_VALUE_STATUS_ERROR; + BT_LOGE_STR("Error: base_url length smaller than 1."); + ret = -1; goto end; } /* @@ -424,8 +428,10 @@ bt_value_status list_append_session(bt_value *results, g_string_append_c(url, '/'); g_string_append(url, session->session_name); - ret = bt_private_value_map_insert_string_entry(map, "url", url->str); - if (ret != BT_VALUE_STATUS_OK) { + ret_status = bt_value_map_insert_string_entry(map, "url", url->str); + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"url\" entry."); + ret = -1; goto end; } @@ -433,9 +439,11 @@ bt_value_status list_append_session(bt_value *results, * key = "target-hostname", * value = , */ - ret = bt_private_value_map_insert_string_entry(map, "target-hostname", + ret_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"target-hostname\" entry."); + ret = -1; goto end; } @@ -443,9 +451,11 @@ bt_value_status list_append_session(bt_value *results, * key = "session-name", * value = , */ - ret = bt_private_value_map_insert_string_entry(map, "session-name", + ret_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"session-name\" entry."); + ret = -1; goto end; } @@ -456,9 +466,11 @@ bt_value_status list_append_session(bt_value *results, { uint32_t live_timer = be32toh(session->live_timer); - ret = bt_private_value_map_insert_integer_entry(map, "timer-us", + ret_status = bt_value_map_insert_integer_entry(map, "timer-us", live_timer); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"timer-us\" entry."); + ret = -1; goto end; } } @@ -470,14 +482,15 @@ bt_value_status list_append_session(bt_value *results, { uint32_t streams = be32toh(session->streams); - ret = bt_private_value_map_insert_integer_entry(map, "stream-count", + ret_status = bt_value_map_insert_integer_entry(map, "stream-count", streams); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"stream-count\" entry."); + ret = -1; goto end; } } - /* * key = "client-count", * value = , @@ -485,17 +498,24 @@ bt_value_status list_append_session(bt_value *results, { uint32_t clients = be32toh(session->clients); - ret = bt_private_value_map_insert_integer_entry(map, "client-count", + ret_status = bt_value_map_insert_integer_entry(map, "client-count", clients); - if (ret != BT_VALUE_STATUS_OK) { + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error inserting \"client-count\" entry."); + ret = -1; goto end; } } - ret = bt_private_value_array_append_element(results, map); + ret_status = bt_value_array_append_element(results, map); + if (ret_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Error appending map to results."); + ret = -1; + } + end: if (url) { - g_string_free(url, TRUE); + g_string_free(url, true); } BT_VALUE_PUT_REF_AND_RESET(map); return ret; @@ -538,9 +558,12 @@ end: */ BT_HIDDEN -bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection) +bt_query_status live_viewer_connection_list_sessions( + struct live_viewer_connection *viewer_connection, + const bt_value **user_result) { - bt_value *results = NULL; + bt_query_status status = BT_QUERY_STATUS_OK; + bt_value *result = NULL; struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; uint32_t i, sessions_count; @@ -550,9 +573,10 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti goto error; } - results = bt_private_value_array_create(); - if (!results) { + result = bt_value_array_create(); + if (!result) { BT_LOGE("Error creating array"); + status = BT_QUERY_STATUS_NOMEM; goto error; } @@ -563,6 +587,7 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } BT_ASSERT(ret_len == sizeof(cmd)); @@ -570,10 +595,12 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); + status = BT_QUERY_STATUS_ERROR; goto error; } if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } BT_ASSERT(ret_len == sizeof(list)); @@ -582,34 +609,38 @@ bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connecti for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection, - &lsession, sizeof(lsession)); + ret_len = lttng_live_recv(viewer_connection, &lsession, + sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); + status = BT_QUERY_STATUS_ERROR; goto error; } if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); + status = BT_QUERY_STATUS_ERROR; goto error; } BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; - if (list_append_session(results, - viewer_connection->url, &lsession) - != BT_VALUE_STATUS_OK) { + if (list_append_session(result, viewer_connection->url, + &lsession)) { + status = BT_QUERY_STATUS_ERROR; goto error; } } + + *user_result = result; goto end; error: - BT_VALUE_PUT_REF_AND_RESET(results); + BT_VALUE_PUT_REF_AND_RESET(result); end: - return results; + return status; } static -int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) +int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; @@ -617,8 +648,8 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) uint32_t i, sessions_count; ssize_t ret_len; uint64_t session_id; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); @@ -667,7 +698,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { - if (lttng_live_add_session(lttng_live, session_id, + if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname, lsession.session_name)) { goto error; @@ -683,13 +714,14 @@ error: } BT_HIDDEN -int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) +int lttng_live_create_viewer_session( + struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_create_session_response resp; ssize_t ret_len; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); @@ -717,7 +749,7 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) BT_LOGE("Error creating viewer session"); goto error; } - if (lttng_live_query_session_ids(lttng_live)) { + if (lttng_live_query_session_ids(lttng_live_msg_iter)) { goto error; } @@ -733,9 +765,10 @@ int receive_streams(struct lttng_live_session *session, { ssize_t ret_len; uint32_t i; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; BT_LOGD("Getting %" PRIu32 " new streams:", stream_count); for (i = 0; i < stream_count; i++) { @@ -796,18 +829,14 @@ int lttng_live_attach_session(struct lttng_live_session *session) struct lttng_viewer_attach_session_request rq; struct lttng_viewer_attach_session_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - if (session->attached) { - return 0; - } - cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -885,9 +914,9 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct lttng_viewer_detach_session_request rq; struct lttng_viewer_detach_session_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; @@ -962,10 +991,10 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, char *data = NULL; ssize_t ret_len; struct lttng_live_session *session = trace->session; - struct lttng_live_component *lttng_live = session->lttng_live; + struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; struct lttng_live_metadata *metadata = trace->metadata; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; @@ -1075,7 +1104,8 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, } BT_HIDDEN -bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, +enum lttng_live_iterator_status lttng_live_get_next_index( + struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream, struct packet_index *index) { @@ -1084,13 +1114,15 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon ssize_t ret_len; struct lttng_viewer_index rp; uint32_t flags, status; - bt_lttng_live_iterator_status retstatus = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + enum lttng_live_iterator_status retstatus = + LTTNG_LIVE_ITERATOR_STATUS_OK; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1109,7 +1141,8 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg()); + BT_LOGE("Error sending get_next_index request: %s", + bt_socket_errormsg()); goto error; } @@ -1120,7 +1153,8 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon goto error; } if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg()); + BT_LOGE("Error receiving get_next_index response: %s", + bt_socket_errormsg()); goto error; } BT_ASSERT(ret_len == sizeof(rp)); @@ -1136,7 +1170,7 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon BT_LOGD("get_next_index: inactive"); memset(index, 0, sizeof(struct packet_index)); index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); - stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end; + stream->current_inactivity_ts = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { BT_ASSERT(stream->ctf_stream_class_id == @@ -1162,8 +1196,6 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon } stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA; - stream->current_packet_end_timestamp = - index->ts_cycles.timestamp_end; if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_LOGD("get_next_index: new metadata needed"); @@ -1171,21 +1203,21 @@ bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_compon } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_LOGD("get_next_index: new streams needed"); - lttng_live_need_new_streams(lttng_live); + lttng_live_need_new_streams(lttng_live_msg_iter); } break; } case LTTNG_VIEWER_INDEX_RETRY: BT_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; break; case LTTNG_VIEWER_INDEX_ERR: @@ -1204,17 +1236,18 @@ end: error: if (lttng_live_is_canceled(lttng_live)) { - retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return retstatus; } BT_HIDDEN -enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, - struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, - uint64_t req_len, uint64_t *recv_len) +enum bt_msg_iter_medium_status lttng_live_get_stream_bytes( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream, uint8_t *buf, + uint64_t offset, uint64_t req_len, uint64_t *recv_len) { enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK; struct lttng_viewer_cmd cmd; @@ -1222,11 +1255,13 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_com struct lttng_viewer_trace_packet rp; ssize_t ret_len; uint32_t flags, status; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1289,7 +1324,7 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_com } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_LOGD("get_data_packet: new streams needed, try again later"); - lttng_live_need_new_streams(lttng_live); + lttng_live_need_new_streams(lttng_live_msg_iter); } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { @@ -1337,24 +1372,27 @@ error: * Request new streams for a session. */ BT_HIDDEN -bt_lttng_live_iterator_status lttng_live_get_new_streams( +enum lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_live_session *session) { - bt_lttng_live_iterator_status status = - BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status = + LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; ssize_t ret_len; - struct lttng_live_component *lttng_live = session->lttng_live; - struct bt_live_viewer_connection *viewer_connection = - lttng_live->viewer_connection; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; + struct lttng_live_component *lttng_live = + lttng_live_msg_iter->lttng_live_comp; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; + return LTTNG_LIVE_ITERATOR_STATUS_OK; } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); @@ -1373,7 +1411,8 @@ bt_lttng_live_iterator_status lttng_live_get_new_streams( memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg()); + BT_LOGE("Error sending get_new_streams request: %s", + bt_socket_errormsg()); goto error; } @@ -1401,7 +1440,7 @@ bt_lttng_live_iterator_status lttng_live_get_new_streams( case LTTNG_VIEWER_NEW_STREAMS_HUP: session->new_streams_needed = false; session->closed = true; - status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; + status = LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_LOGE("get_new_streams error"); @@ -1419,30 +1458,31 @@ end: error: if (lttng_live_is_canceled(lttng_live)) { - status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return status; } BT_HIDDEN -struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, - struct lttng_live_component *lttng_live) +struct live_viewer_connection *live_viewer_connection_create( + const char *url, bool in_query, + struct lttng_live_msg_iter *lttng_live_msg_iter) { - struct bt_live_viewer_connection *viewer_connection; + struct live_viewer_connection *viewer_connection; - viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + viewer_connection = g_new0(struct live_viewer_connection, 1); if (bt_socket_init() != 0) { goto error; } - bt_object_init(&viewer_connection->obj, connection_release); + bt_object_init_shared(&viewer_connection->obj, connection_release); viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; - viewer_connection->lttng_live = lttng_live; + viewer_connection->in_query = in_query; + viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { goto error; @@ -1463,19 +1503,20 @@ error: } BT_HIDDEN -void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection) +void live_viewer_connection_destroy( + struct live_viewer_connection *viewer_connection) { BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str); lttng_live_disconnect_viewer(viewer_connection); - g_string_free(viewer_connection->url, TRUE); + g_string_free(viewer_connection->url, true); if (viewer_connection->relay_hostname) { - g_string_free(viewer_connection->relay_hostname, TRUE); + g_string_free(viewer_connection->relay_hostname, true); } if (viewer_connection->target_hostname) { - g_string_free(viewer_connection->target_hostname, TRUE); + g_string_free(viewer_connection->target_hostname, true); } if (viewer_connection->session_name) { - g_string_free(viewer_connection->session_name, TRUE); + g_string_free(viewer_connection->session_name, true); } g_free(viewer_connection);