#include "health-relayd.h"
#include "testpoint.h"
#include "viewer-stream.h"
+#include "stream.h"
+#include "session.h"
+#include "ctf-trace.h"
static struct lttng_uri *live_uri;
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
stream_n.node) {
+ struct ctf_trace *ctf_trace;
+
health_code_update();
/* Ignore if not the same session. */
continue;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ vstream->path_name);
+ assert(ctf_trace);
+
send_stream.id = htobe64(vstream->stream_handle);
- send_stream.ctf_trace_id = htobe64(vstream->ctf_trace->id);
+ send_stream.ctf_trace_id = htobe64(ctf_trace->id);
send_stream.metadata_flag = htobe32(vstream->metadata_flag);
strncpy(send_stream.path_name, vstream->path_name,
sizeof(send_stream.path_name));
uint32_t *nb_created)
{
int ret;
- struct relay_stream *stream;
struct lttng_ht_iter iter;
+ struct ctf_trace *ctf_trace;
assert(session);
* Create viewer streams for relay streams that are ready to be used for a
* the given session id only.
*/
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- struct relay_viewer_stream *vstream;
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ struct relay_stream *stream;
health_code_update();
- if (stream->session->id != session->id ||
- !stream->ctf_trace || !stream->viewer_ready) {
- /*
- * Ignore stream from a different session. Don't create streams
- * with no ctf_trace or not ready for the viewer.
- */
+ if (ctf_trace->invalid_flag) {
continue;
}
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (!vstream) {
- vstream = viewer_stream_create(stream, seek_t);
+ cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+ struct relay_viewer_stream *vstream;
+
+ if (!stream->viewer_ready) {
+ continue;
+ }
+
+ vstream = viewer_stream_find_by_id(stream->stream_handle);
if (!vstream) {
- ret = -1;
- goto error_unlock;
+ vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+ if (!vstream) {
+ ret = -1;
+ goto error_unlock;
+ }
+ /* Acquire reference to ctf_trace. */
+ ctf_trace_get_ref(ctf_trace);
+
+ if (nb_created) {
+ /* Update number of created stream counter. */
+ (*nb_created)++;
+ }
+ } else if (!vstream->sent_flag && nb_unsent) {
+ /* Update number of unsent stream counter. */
+ (*nb_unsent)++;
}
- if (nb_created) {
- /* Update number of created stream counter. */
- (*nb_created)++;
+ /* Update number of total stream counter. */
+ if (nb_total) {
+ (*nb_total)++;
}
- } else if (!vstream->sent_flag && nb_unsent) {
- /* Update number of unsent stream counter. */
- (*nb_unsent)++;
- }
- /* Update number of total stream counter. */
- if (nb_total) {
- (*nb_total)++;
}
}
ret = 0;
error_unlock:
+ rcu_read_unlock();
pthread_mutex_unlock(&session->viewer_ready_lock);
return ret;
}
/* Stopping all threads */
DBG("Terminating all live threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
+ ret = notify_thread_pipe(live_conn_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
}
/* Add quit pipe */
- ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR);
if (ret < 0) {
goto error;
}
* Return 1 if it was triggered else 0;
*/
static
-int check_thread_quit_pipe(int fd, uint32_t events)
+int check_live_conn_pipe(int fd, uint32_t events)
{
- if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
+ if (fd == live_conn_pipe[0] && (events & LPOLLIN)) {
return 1;
}
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_thread_quit_pipe(pollfd, revents);
+ ret = check_live_conn_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;
health_code_update();
-
ret = send_response(cmd->sock, &reply, sizeof(reply));
if (ret < 0) {
goto end;
sizeof(send_session.hostname));
send_session.id = htobe64(session->id);
send_session.live_timer = htobe32(session->live_timer);
- send_session.clients = htobe32(session->viewer_attached);
+ send_session.clients = htobe32(session->viewer_refcount);
send_session.streams = htobe32(session->stream_count);
health_code_update();
assert(cmd);
assert(sessions_ht);
- DBG("Attach session received");
-
health_code_update();
/* Receive the request from the connected client. */
response.status = htobe32(VIEWER_ATTACH_UNK);
goto send_reply;
}
+ session_viewer_attach(session);
+ DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
- if (cmd->session_id == session->id) {
- /* Same viewer already attached, just send the stream list. */
- send_streams = 1;
- response.status = htobe32(VIEWER_ATTACH_OK);
- } else if (session->viewer_attached != 0) {
+ if (uatomic_read(&session->viewer_refcount) > 1) {
DBG("Already a viewer attached");
response.status = htobe32(VIEWER_ATTACH_ALREADY);
+ session_viewer_detach(session);
goto send_reply;
} else if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
} else {
- session->viewer_attached++;
send_streams = 1;
response.status = htobe32(VIEWER_ATTACH_OK);
cmd->session_id = session->id;
struct ctf_packet_index packet_index;
struct relay_viewer_stream *vstream;
struct relay_stream *rstream;
+ struct ctf_trace *ctf_trace;
+ struct relay_session *session;
assert(cmd);
assert(sessions_ht);
health_code_update();
rcu_read_lock();
+ session = session_find_by_id(sessions_ht, cmd->session_id);
+ if (!session) {
+ ret = -1;
+ goto end_unlock;
+ }
+
vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
goto end_unlock;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
+ assert(ctf_trace);
+
memset(&viewer_index, 0, sizeof(viewer_index));
/*
vstream->index_read_fd = ret;
}
- rstream = relay_stream_find_by_id(vstream->stream_handle);
- if (rstream) {
+ rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
+ assert(rstream);
+
+ if (!rstream->close_flag) {
if (vstream->abort_flag) {
/* Rotate on abort (overwrite). */
DBG("Viewer rotate because of overwrite");
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
+ /* ret == 0 means successful so we continue. */
}
+
pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
if (rstream->beacon_ts_end != -1ULL &&
viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto send_reply;
- /*
- * Reader and writer are working in the same tracefile, so we care
- * about the number of index received and sent. Otherwise, we read
- * up to EOF.
- */
} else if (rstream->total_index_received <= vstream->last_sent_index
&& !vstream->close_write_flag) {
+ /*
+ * Reader and writer are working in the same tracefile, so we care
+ * about the number of index received and sent. Otherwise, we read
+ * up to EOF.
+ */
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
/* No new index to send, retry later. */
viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
}
}
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- } else if (!rstream && vstream->close_write_flag &&
+ } else if (rstream->close_flag && vstream->close_write_flag &&
vstream->total_index_received == vstream->last_sent_index) {
/* Last index sent and current tracefile closed in write */
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
} else {
vstream->close_write_flag = 1;
}
- if (!vstream->ctf_trace->metadata_received ||
- vstream->ctf_trace->metadata_received >
- vstream->ctf_trace->metadata_sent) {
+ if (!ctf_trace->metadata_received ||
+ ctf_trace->metadata_received > ctf_trace->metadata_sent) {
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
pthread_mutex_lock(&vstream->overwrite_lock);
if (vstream->abort_flag) {
/*
- * The file is being overwritten by the writer, we cannot
- * use it.
+ * The file is being overwritten by the writer, we cannot * use it.
*/
viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
pthread_mutex_unlock(&vstream->overwrite_lock);
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
goto send_reply;
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
} else {
struct lttng_viewer_get_packet get_packet_info;
struct lttng_viewer_trace_packet reply;
struct relay_viewer_stream *stream;
+ struct ctf_trace *ctf_trace;
assert(cmd);
if (!stream) {
goto error;
}
- assert(stream->ctf_trace);
+
+ ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
/*
* First time we read this stream, we need open the tracefile, we should
stream->read_fd = ret;
}
- if (!stream->ctf_trace->metadata_received ||
- stream->ctf_trace->metadata_received >
- stream->ctf_trace->metadata_sent) {
+ if (!ctf_trace->metadata_received ||
+ ctf_trace->metadata_received > ctf_trace->metadata_sent) {
reply.status = htobe32(VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
goto send_reply;
struct lttng_viewer_get_metadata request;
struct lttng_viewer_metadata_packet reply;
struct relay_viewer_stream *stream;
+ struct ctf_trace *ctf_trace;
assert(cmd);
ERR("Invalid metadata stream");
goto error;
}
- assert(stream->ctf_trace);
- assert(stream->ctf_trace->metadata_sent <=
- stream->ctf_trace->metadata_received);
- len = stream->ctf_trace->metadata_received -
- stream->ctf_trace->metadata_sent;
+ ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+ assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
+
+ len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
if (len == 0) {
reply.status = htobe32(VIEWER_NO_NEW_METADATA);
goto send_reply;
PERROR("Relay reading metadata file");
goto error;
}
- stream->ctf_trace->metadata_sent += read_len;
+ ctf_trace->metadata_sent += read_len;
reply.status = htobe32(VIEWER_METADATA_OK);
goto send_reply;
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
- if (relay_connection->session &&
- relay_connection->session->viewer_attached > 0) {
- relay_connection->session->viewer_attached--;
- }
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
/*
* Delete all streams for a specific session ID.
*/
-static
-void viewer_del_streams(uint64_t session_id)
+static void destroy_viewer_streams_by_session(struct relay_session *session)
{
struct relay_viewer_stream *stream;
struct lttng_ht_iter iter;
+ assert(session);
+
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
stream_n.node) {
- health_code_update();
+ struct ctf_trace *ctf_trace;
- if (stream->session_id != session_id) {
+ health_code_update();
+ if (stream->session_id != session->id) {
continue;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+
viewer_stream_delete(stream);
- assert(stream->ctf_trace);
if (stream->metadata_flag) {
- /*
- * The metadata viewer stream is destroyed once the refcount on the
- * ctf trace goes to 0 in the destroy stream function thus there is
- * no explicit call to that function here.
- */
- stream->ctf_trace->metadata_sent = 0;
- stream->ctf_trace->viewer_metadata_stream = NULL;
- } else {
- viewer_stream_destroy(stream);
+ ctf_trace->metadata_sent = 0;
+ ctf_trace->viewer_metadata_stream = NULL;
}
+
+ viewer_stream_destroy(ctf_trace, stream);
}
rcu_read_unlock();
}
+static void try_destroy_streams(struct relay_session *session)
+{
+ struct ctf_trace *ctf_trace;
+ struct lttng_ht_iter iter;
+
+ assert(session);
+
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ /* Attempt to destroy the ctf trace of that session. */
+ ctf_trace_try_destroy(session, ctf_trace);
+ }
+}
+
/*
* Delete and free a connection.
*
*/
static
void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection)
+ struct lttng_ht_iter *iter, struct relay_command *relay_connection,
+ struct lttng_ht *sessions_ht)
{
int ret;
+ struct relay_session *session;
assert(relay_connections_ht);
assert(iter);
assert(relay_connection);
+ assert(sessions_ht);
DBG("Cleaning connection of session ID %" PRIu64,
relay_connection->session_id);
+ rcu_read_lock();
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
- viewer_del_streams(relay_connection->session_id);
+ session = session_find_by_id(sessions_ht, relay_connection->session_id);
+ if (session) {
+ /*
+ * Very important that this is done before destroying the session so we
+ * can put back every viewer stream reference from the ctf_trace.
+ */
+ destroy_viewer_streams_by_session(session);
+ try_destroy_streams(session);
+ session_viewer_try_destroy(sessions_ht, session);
+ }
+ rcu_read_unlock();
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
health_code_update();
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_thread_quit_pipe(pollfd, revents);
+ ret = check_live_conn_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;
if (revents & (LPOLLERR)) {
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Viewer socket %d hung up", pollfd);
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
} else if (revents & LPOLLIN) {
ret = relay_connection->sock->ops->recvmsg(
relay_connection->sock, &recv_hdr,
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
DBG("Viewer control connection closed with %d",
pollfd);
} else {
/* Clear the session on error. */
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
DBG("Viewer connection closed with %d", pollfd);
}
}
relay_connection = caa_container_of(node, struct relay_command,
sock_n);
- del_connection(relay_connections_ht, &iter, relay_connection);
+ del_connection(relay_connections_ht, &iter, relay_connection,
+ relay_ctx->sessions_ht);
}
rcu_read_unlock();
error_poll_create: