X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=d5f04582419dffec1466c60c4de449cf34948a03;hp=c5a6e4edd8450e2048e955b4abd6ca8219f59fe1;hb=9a9c8637bbfb1b12b8302e03d5bc0453672b4d06;hpb=f188393720c2570fce2eed69862cf645724a9efe diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index c5a6e4edd..d5f045824 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1,26 +1,17 @@ /* - * Copyright (C) 2013 - Julien Desfossez - * David Goulet - * 2015 - Mathieu Desnoyers + * Copyright (C) 2013 Julien Desfossez + * Copyright (C) 2013 David Goulet + * Copyright (C) 2015 Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License, version 2 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE +#include #include #include +#include #include #include #include @@ -34,40 +25,39 @@ #include #include #include -#include +#include #include -#include #include -#include -#include -#include +#include -#include #include +#include #include #include -#include #include +#include +#include #include #include -#include #include #include +#include #include #include +#include #include "cmd.h" +#include "connection.h" +#include "ctf-trace.h" +#include "health-relayd.h" #include "live.h" #include "lttng-relayd.h" -#include "utils.h" -#include "health-relayd.h" -#include "testpoint.h" -#include "viewer-stream.h" -#include "stream.h" #include "session.h" -#include "ctf-trace.h" -#include "connection.h" +#include "stream.h" +#include "testpoint.h" +#include "utils.h" #include "viewer-session.h" +#include "viewer-stream.h" #define SESSION_BUF_DEFAULT_COUNT 16 @@ -199,10 +189,9 @@ end: */ static ssize_t send_viewer_streams(struct lttcomm_sock *sock, - struct relay_session *session, unsigned int ignore_sent_flag) + uint64_t session_id, unsigned int ignore_sent_flag) { ssize_t ret; - struct lttng_viewer_stream send_stream; struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; @@ -211,6 +200,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; health_code_update(); @@ -220,7 +210,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, pthread_mutex_lock(&vstream->stream->lock); /* Ignore if not the same session. */ - if (vstream->stream->trace->session->id != session->id || + if (vstream->stream->trace->session->id != session_id || (!ignore_sent_flag && vstream->sent_flag)) { pthread_mutex_unlock(&vstream->stream->lock); viewer_stream_put(vstream); @@ -232,10 +222,21 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, send_stream.ctf_trace_id = htobe64(ctf_trace->id); send_stream.metadata_flag = htobe32( vstream->stream->is_metadata); - strncpy(send_stream.path_name, vstream->path_name, - sizeof(send_stream.path_name)); - strncpy(send_stream.channel_name, vstream->channel_name, - sizeof(send_stream.channel_name)); + if (lttng_strncpy(send_stream.path_name, vstream->path_name, + sizeof(send_stream.path_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end_unlock; + } + if (lttng_strncpy(send_stream.channel_name, + vstream->channel_name, + sizeof(send_stream.channel_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end_unlock; + } DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); @@ -262,24 +263,32 @@ end_unlock: * viewer stream of the session, the number of unsent stream and the number of * stream created. Those counters can be NULL and thus will be ignored. * + * session must be locked to ensure that we see either none or all initial + * streams for a session, but no intermediate state.. + * * Return 0 on success or else a negative value. */ -static -int make_viewer_streams(struct relay_session *session, - enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent, - uint32_t *nb_created, bool *closed) +static int make_viewer_streams(struct relay_session *session, + struct lttng_trace_chunk *viewer_trace_chunk, + enum lttng_viewer_seek seek_t, + uint32_t *nb_total, + uint32_t *nb_unsent, + uint32_t *nb_created, + bool *closed) { int ret; struct lttng_ht_iter iter; struct ctf_trace *ctf_trace; assert(session); + ASSERT_LOCKED(session->lock); - /* - * Hold the session lock to ensure that we see either none or - * all initial streams for a session, but no intermediate state. - */ - pthread_mutex_lock(&session->lock); + if (!viewer_trace_chunk) { + ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk", + session->session_name); + ret = -1; + goto error; + } if (session->connection_closed) { *closed = true; @@ -292,6 +301,7 @@ int make_viewer_streams(struct relay_session *session, rcu_read_lock(); cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { + bool trace_has_metadata_stream = false; struct relay_stream *stream; health_code_update(); @@ -300,6 +310,30 @@ int make_viewer_streams(struct relay_session *session, continue; } + /* + * Iterate over all the streams of the trace to see if we have a + * metadata stream. + */ + cds_list_for_each_entry_rcu( + stream, &ctf_trace->stream_list, stream_node) + { + if (stream->is_metadata) { + trace_has_metadata_stream = true; + break; + } + } + + /* + * If there is no metadata stream in this trace at the moment + * and we never sent one to the viewer, skip the trace. We + * accept that the viewer will not see this trace at all. + */ + if (!trace_has_metadata_stream && + !ctf_trace->metadata_stream_sent_to_viewer) { + ctf_trace_put(ctf_trace); + continue; + } + cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) { struct relay_viewer_stream *vstream; @@ -307,15 +341,24 @@ int make_viewer_streams(struct relay_session *session, continue; } /* - * stream published is protected by the session - * lock. + * stream published is protected by the session lock. */ if (!stream->published) { goto next; } vstream = viewer_stream_get_by_id(stream->stream_handle); if (!vstream) { - vstream = viewer_stream_create(stream, seek_t); + /* + * Save that we sent the metadata stream to the + * viewer. So that we know what trace the viewer + * is aware of. + */ + if (stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = + true; + } + vstream = viewer_stream_create(stream, + viewer_trace_chunk, seek_t); if (!vstream) { ret = -1; ctf_trace_put(ctf_trace); @@ -327,17 +370,37 @@ int make_viewer_streams(struct relay_session *session, /* Update number of created stream counter. */ (*nb_created)++; } + /* + * Ensure a self-reference is preserved even + * after we have put our local reference. + */ + if (!viewer_stream_get(vstream)) { + ERR("Unable to get self-reference on viewer stream, logic error."); + abort(); + } } else { if (!vstream->sent_flag && nb_unsent) { /* Update number of unsent stream counter. */ (*nb_unsent)++; } - viewer_stream_put(vstream); } /* Update number of total stream counter. */ if (nb_total) { - (*nb_total)++; + if (stream->is_metadata) { + if (!stream->closed || + stream->metadata_received > vstream->metadata_sent) { + (*nb_total)++; + } + } else { + if (!stream->closed || + !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) { + + (*nb_total)++; + } + } } + /* Put local reference. */ + viewer_stream_put(vstream); next: stream_put(stream); } @@ -348,7 +411,7 @@ int make_viewer_streams(struct relay_session *session, error_unlock: rcu_read_unlock(); - pthread_mutex_unlock(&session->lock); +error: return ret; } @@ -364,7 +427,8 @@ int relayd_live_stop(void) * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ static -int create_thread_poll_set(struct lttng_poll_event *events, int size) +int create_named_thread_poll_set(struct lttng_poll_event *events, + int size, const char *name) { int ret; @@ -373,8 +437,10 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size) goto error; } - ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); - if (ret < 0) { + ret = fd_tracker_util_poll_create(the_fd_tracker, + name, events, 1, LTTNG_CLOEXEC); + if (ret) { + PERROR("Failed to create \"%s\" poll file descriptor", name); goto error; } @@ -405,14 +471,76 @@ int check_thread_quit_pipe(int fd, uint32_t events) return 0; } +static +int create_sock(void *data, int *out_fd) +{ + int ret; + struct lttcomm_sock *sock = data; + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + goto end; + } + + *out_fd = sock->fd; +end: + return ret; +} + +static +int close_sock(void *data, int *in_fd) +{ + struct lttcomm_sock *sock = data; + + return sock->ops->close(sock); +} + +static int accept_sock(void *data, int *out_fd) +{ + int ret = 0; + /* Socks is an array of in_sock, out_sock. */ + struct lttcomm_sock **socks = data; + struct lttcomm_sock *in_sock = socks[0]; + + socks[1] = in_sock->ops->accept(in_sock); + if (!socks[1]) { + ret = -1; + goto end; + } + *out_fd = socks[1]->fd; +end: + return ret; +} + +static +struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, + const char *name) +{ + int out_fd, ret; + struct lttcomm_sock *socks[2] = { listening_sock, NULL }; + struct lttcomm_sock *new_sock = NULL; + + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd, + (const char **) &name, 1, accept_sock, &socks); + if (ret) { + goto end; + } + new_sock = socks[1]; + DBG("%s accepted, socket %d", name, new_sock->fd); +end: + return new_sock; +} + /* * Create and init socket from uri. */ static -struct lttcomm_sock *init_socket(struct lttng_uri *uri) +struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) { - int ret; + int ret, sock_fd; struct lttcomm_sock *sock = NULL; + char uri_str[LTTNG_PATH_MAX]; + char *formated_name = NULL; sock = lttcomm_alloc_sock_from_uri(uri); if (sock == NULL) { @@ -420,14 +548,33 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri) goto error; } - ret = lttcomm_create_sock(sock); - if (ret < 0) { + /* + * Don't fail to create the socket if the name can't be built as it is + * only used for debugging purposes. + */ + ret = uri_to_str_url(uri, uri_str, sizeof(uri_str)); + uri_str[sizeof(uri_str) - 1] = '\0'; + if (ret >= 0) { + ret = asprintf(&formated_name, "%s socket @ %s", name, + uri_str); + if (ret < 0) { + formated_name = NULL; + } + } + + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, + (const char **) (formated_name ? &formated_name : NULL), + 1, create_sock, sock); + if (ret) { + PERROR("Failed to create \"%s\" socket", + formated_name ?: "Unknown"); goto error; } - DBG("Listening on sock %d for live", sock->fd); + DBG("Listening on %s socket %d", name, sock->fd); ret = sock->ops->bind(sock); if (ret < 0) { + PERROR("Failed to bind lttng-live socket"); goto error; } @@ -437,12 +584,14 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri) } + free(formated_name); return sock; error: if (sock) { lttcomm_destroy_sock(sock); } + free(formated_name); return NULL; } @@ -459,17 +608,19 @@ void *thread_listener(void *data) DBG("[thread] Relay live listener started"); + rcu_register_thread(); health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_LISTENER); health_code_update(); - live_control_sock = init_socket(live_uri); + live_control_sock = init_socket(live_uri, "Live listener"); if (!live_control_sock) { goto error_sock_control; } /* Pass 2 as size here for the thread quit pipe and control sockets. */ - ret = create_thread_poll_set(&events, 2); + ret = create_named_thread_poll_set(&events, 2, + "Live listener thread epoll"); if (ret < 0) { goto error_create_poll; } @@ -514,11 +665,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -526,10 +672,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * viewer connection is allocated in this @@ -540,7 +683,8 @@ restart: struct relay_connection *new_conn; struct lttcomm_sock *newsock; - newsock = live_control_sock->ops->accept(live_control_sock); + newsock = accept_live_sock(live_control_sock, + "Live socket to client"); if (!newsock) { PERROR("accepting control sock"); goto error; @@ -572,6 +716,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -580,13 +730,18 @@ exit: error: error_poll_add: error_testpoint: - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_create_poll: if (live_control_sock->fd >= 0) { - ret = live_control_sock->ops->close(live_control_sock); + int sock_fd = live_control_sock->fd; + + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, + &sock_fd, 1, close_sock, + live_control_sock); if (ret) { PERROR("close"); } + live_control_sock->fd = -1; } lttcomm_destroy_sock(live_control_sock); error_sock_control: @@ -595,6 +750,7 @@ error_sock_control: DBG("Live viewer listener thread exited with error"); } health_unregister(health_relayd); + rcu_unregister_thread(); DBG("Live viewer listener thread cleanup complete"); if (lttng_relay_stop_threads()) { ERR("Error stopping threads"); @@ -623,12 +779,16 @@ void *thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&viewer_conn_queue.futex); + if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -777,7 +937,7 @@ end: static int viewer_list_sessions(struct relay_connection *conn) { - int ret; + int ret = 0; struct lttng_viewer_list_sessions session_list; struct lttng_ht_iter iter; struct relay_session *session; @@ -799,6 +959,20 @@ int viewer_list_sessions(struct relay_connection *conn) health_code_update(); + pthread_mutex_lock(&session->lock); + if (session->connection_closed) { + /* Skip closed session */ + goto next_session; + } + if (!session->current_trace_chunk) { + /* + * Skip un-attachable session. It is either + * being destroyed or has not had a trace + * chunk created against it yet. + */ + goto next_session; + } + if (count >= buf_count) { struct lttng_viewer_session *newbuf; uint32_t new_buf_count = buf_count << 1; @@ -807,17 +981,23 @@ int viewer_list_sessions(struct relay_connection *conn) new_buf_count * sizeof(*send_session_buf)); if (!newbuf) { ret = -1; - rcu_read_unlock(); - goto end_free; + goto break_loop; } send_session_buf = newbuf; buf_count = new_buf_count; } send_session = &send_session_buf[count]; - strncpy(send_session->session_name, session->session_name, - sizeof(send_session->session_name)); - strncpy(send_session->hostname, session->hostname, - sizeof(send_session->hostname)); + if (lttng_strncpy(send_session->session_name, + session->session_name, + sizeof(send_session->session_name))) { + ret = -1; + goto break_loop; + } + if (lttng_strncpy(send_session->hostname, session->hostname, + sizeof(send_session->hostname))) { + ret = -1; + goto break_loop; + } send_session->id = htobe64(session->id); send_session->live_timer = htobe32(session->live_timer); if (session->viewer_attached) { @@ -827,8 +1007,17 @@ int viewer_list_sessions(struct relay_connection *conn) } send_session->streams = htobe32(session->stream_count); count++; + next_session: + pthread_mutex_unlock(&session->lock); + continue; + break_loop: + pthread_mutex_unlock(&session->lock); + break; } rcu_read_unlock(); + if (ret < 0) { + goto end_free; + } session_list.sessions_count = htobe32(count); @@ -864,7 +1053,7 @@ int viewer_get_new_streams(struct relay_connection *conn) uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; - struct relay_session *session; + struct relay_session *session = NULL; uint64_t session_id; bool closed = false; @@ -893,32 +1082,37 @@ int viewer_get_new_streams(struct relay_connection *conn) } if (!viewer_session_is_attached(conn->viewer_session, session)) { - send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } - send_streams = 1; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - - ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, + pthread_mutex_lock(&session->lock); + ret = make_viewer_streams(session, + conn->viewer_session->current_trace_chunk, + LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { - goto end_put_session; + goto error_unlock_session; } + send_streams = 1; + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); + /* Only send back the newly created streams with the unsent ones. */ nb_streams = nb_created + nb_unsent; response.streams_count = htobe32(nb_streams); /* - * If the session is closed, HUP when there are no more streams. + * If the session is closed, HUP when there are no more streams + * with data. */ if (closed && nb_total == 0) { send_streams = 0; response.streams_count = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); - goto send_reply; + goto send_reply_unlock; } +send_reply_unlock: + pthread_mutex_unlock(&session->lock); send_reply: health_code_update(); @@ -942,7 +1136,7 @@ send_reply: * streams that were not sent from that point will be sent to * the viewer. */ - ret = send_viewer_streams(conn->sock, session, 0); + ret = send_viewer_streams(conn->sock, session_id, 0); if (ret < 0) { goto end_put_session; } @@ -953,6 +1147,10 @@ end_put_session: } error: return ret; +error_unlock_session: + pthread_mutex_unlock(&session->lock); + session_put(session); + return ret; } /* @@ -968,7 +1166,9 @@ int viewer_attach_session(struct relay_connection *conn) struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct relay_session *session = NULL; + enum lttng_viewer_attach_return_code viewer_attach_status; bool closed = false; + uint64_t session_id; assert(conn); @@ -980,6 +1180,7 @@ int viewer_attach_session(struct relay_connection *conn) goto error; } + session_id = be64toh(request.session_id); health_code_update(); memset(&response, 0, sizeof(response)); @@ -990,16 +1191,24 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - session = session_get_by_id(be64toh(request.session_id)); + session = session_get_by_id(session_id); if (!session) { - DBG("Relay session %" PRIu64 " not found", - be64toh(request.session_id)); + DBG("Relay session %" PRIu64 " not found", session_id); response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); goto send_reply; } - DBG("Attach session ID %" PRIu64 " received", - be64toh(request.session_id)); + DBG("Attach session ID %" PRIu64 " received", session_id); + pthread_mutex_lock(&session->lock); + if (!session->current_trace_chunk) { + /* + * Session is either being destroyed or it never had a trace + * chunk created against it. + */ + DBG("Session requested by live client has no current trace chunk, returning unknown session"); + response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); + goto send_reply; + } if (session->live_timer == 0) { DBG("Not live session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); @@ -1007,10 +1216,10 @@ int viewer_attach_session(struct relay_connection *conn) } send_streams = 1; - ret = viewer_session_attach(conn->viewer_session, session); - if (ret) { - DBG("Already a viewer attached"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY); + viewer_attach_status = viewer_session_attach(conn->viewer_session, + session); + if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { + response.status = htobe32(viewer_attach_status); goto send_reply; } @@ -1027,13 +1236,17 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, - NULL, &closed); + ret = make_viewer_streams(session, + conn->viewer_session->current_trace_chunk, seek_type, + &nb_streams, NULL, NULL, &closed); if (ret < 0) { goto end_put_session; } - response.streams_count = htobe32(nb_streams); + pthread_mutex_unlock(&session->lock); + session_put(session); + session = NULL; + response.streams_count = htobe32(nb_streams); /* * If the session is closed when the viewer is attaching, it * means some of the streams may have been concurrently removed, @@ -1043,7 +1256,7 @@ int viewer_attach_session(struct relay_connection *conn) if (closed) { send_streams = 0; response.streams_count = 0; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); + response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); goto send_reply; } @@ -1065,13 +1278,14 @@ send_reply: } /* Send stream and ignore the sent flag. */ - ret = send_viewer_streams(conn->sock, session, 1); + ret = send_viewer_streams(conn->sock, session_id, 1); if (ret < 0) { goto end_put_session; } end_put_session: if (session) { + pthread_mutex_unlock(&session->lock); session_put(session); } error: @@ -1081,8 +1295,8 @@ error: /* * Open the index file if needed for the given vstream. * - * If an index file is successfully opened, the vstream index_fd set with - * it. + * If an index file is successfully opened, the vstream will set it as its + * current index file. * * Return 0 on success, a negative value on error (-ENOENT if not ready yet). * @@ -1092,32 +1306,34 @@ static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream) { int ret = 0; + const uint32_t connection_major = rstream->trace->session->major; + const uint32_t connection_minor = rstream->trace->session->minor; + enum lttng_trace_chunk_status chunk_status; - if (vstream->index_fd) { + if (vstream->index_file) { goto end; } /* * First time, we open the index file and at least one index is ready. */ - if (rstream->total_index_received == 0) { + if (rstream->index_received_seqcount == 0) { ret = -ENOENT; goto end; } - ret = index_open(vstream->path_name, vstream->channel_name, - vstream->stream->tracefile_count, - vstream->current_tracefile_id); - if (ret >= 0) { - vstream->index_fd = stream_fd_create(ret); - if (!vstream->index_fd) { - if (close(ret)) { - PERROR("close"); - } - ret = -1; + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor), + true, &vstream->index_file); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { + ret = -ENOENT; } else { - ret = 0; + ret = -1; } - goto end; } end: @@ -1141,15 +1357,26 @@ static int check_index_status(struct relay_viewer_stream *vstream, { int ret; - if (trace->session->connection_closed - && rstream->total_index_received - == vstream->last_sent_index) { - /* Last index sent and session connection is closed. */ + DBG("Check index status: index_received_seqcount %" PRIu64 " " + "index_sent_seqcount %" PRIu64 " " + "for stream %" PRIu64, + rstream->index_received_seqcount, + vstream->index_sent_seqcount, + vstream->stream->stream_handle); + if ((trace->session->connection_closed || rstream->closed) + && rstream->index_received_seqcount + == vstream->index_sent_seqcount) { + /* + * Last index sent and session connection or relay + * stream are closed. + */ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); goto hup; } else if (rstream->beacon_ts_end != -1ULL && - rstream->total_index_received - == vstream->last_sent_index) { + (rstream->index_received_seqcount == 0 || + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount + <= vstream->index_sent_seqcount))) { /* * We've received a synchronization beacon and the last index * available has been sent, the index for now is inactive. @@ -1158,80 +1385,87 @@ static int check_index_status(struct relay_viewer_stream *vstream, * inform the client of a time interval during which we can * guarantee that there are no events to read (and never will * be). + * + * The sent seqcount can grow higher than receive seqcount on + * clear because the rotation performed by clear will push + * the index_sent_seqcount ahead (see + * viewer_stream_sync_tracefile_array_tail) and skip over + * packet sequence numbers. */ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); index->timestamp_end = htobe64(rstream->beacon_ts_end); index->stream_id = htobe64(rstream->ctf_stream_id); + DBG("Check index status: inactive with beacon, for stream %" PRIu64, + vstream->stream->stream_handle); goto index_ready; - } else if (rstream->total_index_received <= vstream->last_sent_index) { + } else if (rstream->index_received_seqcount == 0 || + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount + <= vstream->index_sent_seqcount)) { /* - * This actually checks the case where recv == last_sent. - * In this case, we have not received a beacon. Therefore, we - * can only ask the client to retry later. + * This checks whether received <= sent seqcount. In + * this case, we have not received a beacon. Therefore, + * we can only ask the client to retry later. + * + * The sent seqcount can grow higher than receive seqcount on + * clear because the rotation performed by clear will push + * the index_sent_seqcount ahead (see + * viewer_stream_sync_tracefile_array_tail) and skip over + * packet sequence numbers. */ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + DBG("Check index status: retry for stream %" PRIu64, + vstream->stream->stream_handle); goto index_ready; - } else if (!viewer_stream_is_tracefile_seq_readable(vstream, - vstream->current_tracefile_seq)) { + } else if (!tracefile_array_seq_in_file(rstream->tfa, + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { /* - * The producer has overwritten our current file. We - * need to rotate. + * The next index we want to send cannot be read either + * because we need to perform a rotation, or due to + * the producer having overwritten its trace file. */ - DBG("Viewer stream %" PRIu64 " rotation due to overwrite", + DBG("Viewer stream %" PRIu64 " rotation", vstream->stream->stream_handle); ret = viewer_stream_rotate(vstream); - if (ret < 0) { - goto end; - } else if (ret == 1) { + if (ret == 1) { /* EOF across entire stream. */ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); goto hup; } - assert(viewer_stream_is_tracefile_seq_readable(vstream, - vstream->current_tracefile_seq)); - /* ret == 0 means successful so we continue. */ - ret = 0; - } else { - ssize_t read_ret; - char tmp[1]; - /* - * Use EOF on current index file to find out when we - * need to rotate. + * If we have been pushed due to overwrite, it + * necessarily means there is data that can be read in + * the stream. If we rotated because we reached the end + * of a tracefile, it means the following tracefile + * needs to contain at least one index, else we would + * have already returned LTTNG_VIEWER_INDEX_RETRY to the + * viewer. The updated index_sent_seqcount needs to + * point to a readable index entry now. + * + * In the case where we "rotate" on a single file, we + * can end up in a case where the requested index is + * still unavailable. */ - read_ret = lttng_read(vstream->index_fd->fd, tmp, 1); - if (read_ret == 1) { - off_t seek_ret; - - /* There is still data to read. Rewind position. */ - seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR); - if (seek_ret < 0) { - ret = -1; - goto end; - } - ret = 0; - } else if (read_ret == 0) { - /* EOF. We need to rotate. */ - DBG("Viewer stream %" PRIu64 " rotation due to EOF", - vstream->stream->stream_handle); - ret = viewer_stream_rotate(vstream); - if (ret < 0) { - goto end; - } else if (ret == 1) { - /* EOF across entire stream. */ - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); - goto hup; - } - assert(viewer_stream_is_tracefile_seq_readable(vstream, - vstream->current_tracefile_seq)); - /* ret == 0 means successful so we continue. */ - ret = 0; - } else { - /* Error reading index. */ - ret = -1; + if (rstream->tracefile_count == 1 && + !tracefile_array_seq_in_file( + rstream->tfa, + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + DBG("Check index status: retry: " + "tracefile array sequence number %" PRIu64 + " not in file for stream %" PRIu64, + vstream->index_sent_seqcount, + vstream->stream->stream_handle); + goto index_ready; } + assert(tracefile_array_seq_in_file(rstream->tfa, + vstream->current_tracefile_id, + vstream->index_sent_seqcount)); } -end: + /* ret == 0 means successful so we continue. */ + ret = 0; return ret; hup: @@ -1249,7 +1483,6 @@ static int viewer_get_next_index(struct relay_connection *conn) { int ret; - ssize_t read_ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; @@ -1273,6 +1506,8 @@ int viewer_get_next_index(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { + DBG("Client requested index of unknown stream id %" PRIu64, + (uint64_t) be64toh(request_index.stream_id)); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } @@ -1295,21 +1530,46 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - /* Try to open an index if one is needed for that stream. */ - ret = try_open_index(vstream, rstream); - if (ret < 0) { - if (ret == -ENOENT) { - /* - * The index is created only when the first data - * packet arrives, it might not be ready at the - * beginning of the session - */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - } else { - /* Unhandled error. */ + if (rstream->ongoing_rotation.is_set) { + /* Rotation is ongoing, try again later. */ + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto send_reply; + } + + if (rstream->trace->session->ongoing_rotation) { + /* Rotation is ongoing, try again later. */ + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto send_reply; + } + + if (rstream->trace_chunk && !lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + rstream->trace_chunk)) { + DBG("Relay stream and viewer chunk ids differ"); + + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + rstream->trace_chunk); + if (ret) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + goto send_reply; } - goto send_reply; + } + if (conn->viewer_session->current_trace_chunk != + vstream->stream_file.trace_chunk) { + bool acquired_reference; + + DBG("Viewer session and viewer stream chunk differ: " + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + assert(acquired_reference); + vstream->stream_file.trace_chunk = + conn->viewer_session->current_trace_chunk; + viewer_stream_sync_tracefile_array_tail(vstream); + viewer_stream_close_files(vstream); } ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); @@ -1325,6 +1585,22 @@ int viewer_get_next_index(struct relay_connection *conn) /* At this point, ret is 0 thus we will be able to read the index. */ assert(!ret); + /* Try to open an index if one is needed for that stream. */ + ret = try_open_index(vstream, rstream); + if (ret == -ENOENT) { + if (rstream->closed) { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto send_reply; + } else { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto send_reply; + } + } + if (ret < 0) { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + goto send_reply; + } + /* * vstream->stream_fd may be NULL if it has been closed by * tracefile rotation, or if we are at the beginning of the @@ -1332,34 +1608,37 @@ int viewer_get_next_index(struct relay_connection *conn) * overwrite caused by tracefile rotation (in association with * unlink performed before overwrite). */ - if (!vstream->stream_fd) { - char fullpath[PATH_MAX]; - - if (vstream->stream->tracefile_count > 0) { - ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, - vstream->path_name, - vstream->channel_name, - vstream->current_tracefile_id); - } else { - ret = snprintf(fullpath, PATH_MAX, "%s/%s", - vstream->path_name, - vstream->channel_name); - } + if (!vstream->stream_file.handle) { + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + struct fs_handle *fs_handle; + + ret = utils_stream_file_path(rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); if (ret < 0) { goto error_put; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening trace file"); - goto error_put; - } - vstream->stream_fd = stream_fd_create(ret); - if (!vstream->stream_fd) { - if (close(ret)) { - PERROR("close"); + + /* + * It is possible the the file we are trying to open is + * missing if the stream has been closed (application exits with + * per-pid buffers) and a clear command has been performed. + */ + status = lttng_trace_chunk_open_fs_handle( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fs_handle, true); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && + rstream->closed) { + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto send_reply; } + PERROR("Failed to open trace file for viewer stream"); goto error_put; } + vstream->stream_file.handle = fs_handle; } ret = check_new_streams(conn); @@ -1370,16 +1649,14 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } - read_ret = lttng_read(vstream->index_fd->fd, &packet_index, - sizeof(packet_index)); - if (read_ret < sizeof(packet_index)) { - ERR("Relay reading index file %d returned %zd", - vstream->index_fd->fd, read_ret); + ret = lttng_index_file_read(vstream->index_file, &packet_index); + if (ret) { + ERR("Relay error reading index file"); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK); - vstream->last_sent_index++; + vstream->index_sent_seqcount++; } /* @@ -1387,7 +1664,7 @@ int viewer_get_next_index(struct relay_connection *conn) */ DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64, rstream->stream_handle, - be64toh(packet_index.offset)); + (uint64_t) be64toh(packet_index.offset)); viewer_index.offset = packet_index.offset; viewer_index.packet_size = packet_index.packet_size; viewer_index.content_size = packet_index.content_size; @@ -1424,9 +1701,11 @@ send_reply: } health_code_update(); - DBG("Index %" PRIu64 " for stream %" PRIu64 " sent", - vstream->last_sent_index, - vstream->stream->stream_handle); + if (vstream) { + DBG("Index %" PRIu64 " for stream %" PRIu64 " sent", + vstream->index_sent_seqcount, + vstream->stream->stream_handle); + } end: if (metadata_viewer_stream) { viewer_stream_put(metadata_viewer_stream); @@ -1453,15 +1732,16 @@ error_put: static int viewer_get_packet(struct relay_connection *conn) { - int ret, send_data = 0; - char *data = NULL; - uint32_t len = 0; - ssize_t read_len; + int ret; + off_t lseek_ret; + char *reply = NULL; struct lttng_viewer_get_packet get_packet_info; - struct lttng_viewer_trace_packet reply; + struct lttng_viewer_trace_packet reply_header; struct relay_viewer_stream *vstream = NULL; - struct ctf_trace *ctf_trace; - struct relay_viewer_stream *metadata_viewer_stream = NULL; + uint32_t reply_size = sizeof(reply_header); + uint32_t packet_data_len = 0; + ssize_t read_len; + uint64_t stream_id; DBG2("Relay get data packet"); @@ -1475,123 +1755,80 @@ int viewer_get_packet(struct relay_connection *conn) health_code_update(); /* From this point on, the error label can be reached. */ - memset(&reply, 0, sizeof(reply)); + memset(&reply_header, 0, sizeof(reply_header)); + stream_id = (uint64_t) be64toh(get_packet_info.stream_id); - vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id)); + vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { - goto error; - } - - ctf_trace = vstream->stream->trace; - - /* metadata_viewer_stream may be NULL. */ - metadata_viewer_stream = - ctf_trace_get_viewer_metadata_stream(ctf_trace); - - if (metadata_viewer_stream) { - bool get_packet_err = false; - - pthread_mutex_lock(&metadata_viewer_stream->stream->lock); - DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64, - metadata_viewer_stream->stream->metadata_received, - metadata_viewer_stream->metadata_sent); - if (!metadata_viewer_stream->stream->metadata_received || - metadata_viewer_stream->stream->metadata_received > - metadata_viewer_stream->metadata_sent) { - /* - * We prevent the client from reading a data stream as - * long as there is metadata left to consume. This - * ensures that the client won't receive data of which - * it can't make sense. - */ - get_packet_err = true; - } - pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); - viewer_stream_put(metadata_viewer_stream); - if (get_packet_err) { - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); - reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; - goto send_reply_nolock; - } - } - - pthread_mutex_lock(&vstream->stream->lock); - /* - * The vstream->stream_fd used here has been opened by - * get_next_index. It is opened there because this is what - * allows us to grab a reference to the file with stream lock - * held, thus protecting us against overwrite caused by - * tracefile rotation. Since tracefile rotation unlinks the old - * data file, we are ensured that we won't have our data - * overwritten under us. - */ - ret = check_new_streams(conn); - if (ret < 0) { - goto end_free; - } else if (ret == 1) { - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); - reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - goto send_reply; + DBG("Client requested packet of unknown stream id %" PRIu64, + stream_id); + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + goto send_reply_nolock; + } else { + packet_data_len = be32toh(get_packet_info.len); + reply_size += packet_data_len; } - len = be32toh(get_packet_info.len); - data = zmalloc(len); - if (!data) { - PERROR("relay data zmalloc"); + reply = zmalloc(reply_size); + if (!reply) { + PERROR("packet reply zmalloc"); + reply_size = sizeof(reply_header); goto error; } - ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), - SEEK_SET); - if (ret < 0) { - PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd, - be64toh(get_packet_info.offset)); + pthread_mutex_lock(&vstream->stream->lock); + lseek_ret = fs_handle_seek(vstream->stream_file.handle, + be64toh(get_packet_info.offset), SEEK_SET); + if (lseek_ret < 0) { + PERROR("Failed to seek file system handle of viewer stream %" PRIu64 + " to offset %" PRIu64, + stream_id, + (uint64_t) be64toh(get_packet_info.offset)); goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); - if (read_len < len) { - PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, - vstream->stream_fd->fd, - be64toh(get_packet_info.offset)); + read_len = fs_handle_read(vstream->stream_file.handle, + reply + sizeof(reply_header), packet_data_len); + if (read_len < packet_data_len) { + PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 + ", offset: %" PRIu64, + stream_id, + (uint64_t) be64toh(get_packet_info.offset)); goto error; } - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); - reply.len = htobe32(len); - send_data = 1; + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); + reply_header.len = htobe32(packet_data_len); goto send_reply; error: - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: if (vstream) { pthread_mutex_unlock(&vstream->stream->lock); } send_reply_nolock: - reply.flags = htobe32(reply.flags); health_code_update(); - ret = send_response(conn->sock, &reply, sizeof(reply)); - if (ret < 0) { - goto end_free; + if (reply) { + memcpy(reply, &reply_header, sizeof(reply_header)); + ret = send_response(conn->sock, reply, reply_size); + } else { + /* No reply to send. */ + ret = send_response(conn->sock, &reply_header, + reply_size); } - health_code_update(); - if (send_data) { - health_code_update(); - ret = send_response(conn->sock, data, len); - if (ret < 0) { - goto end_free; - } - health_code_update(); + health_code_update(); + if (ret < 0) { + PERROR("sendmsg of packet data failed"); + goto end_free; } - DBG("Sent %u bytes for stream %" PRIu64, len, - be64toh(get_packet_info.stream_id)); + DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id); end_free: - free(data); + free(reply); end: if (vstream) { viewer_stream_put(vstream); @@ -1608,6 +1845,7 @@ static int viewer_get_metadata(struct relay_connection *conn) { int ret = 0; + int fd = -1; ssize_t read_len; uint64_t len = 0; char *data = NULL; @@ -1639,6 +1877,8 @@ int viewer_get_metadata(struct relay_connection *conn) * Reply back to the client with an error if we cannot * find it. */ + DBG("Client requested metadata of unknown stream id %" PRIu64, + (uint64_t) be64toh(request.stream_id)); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } @@ -1648,35 +1888,130 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - assert(vstream->metadata_sent <= vstream->stream->metadata_received); - - len = vstream->stream->metadata_received - vstream->metadata_sent; - if (len == 0) { + if (vstream->metadata_sent >= vstream->stream->metadata_received) { + /* + * The live viewers expect to receive a NO_NEW_METADATA + * status before a stream disappears, otherwise they abort the + * entire live connection when receiving an error status. + * + * Clear feature resets the metadata_sent to 0 until the + * same metadata is received again. + */ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); + /* + * The live viewer considers a closed 0 byte metadata stream as + * an error. + */ + if (vstream->metadata_sent > 0) { + vstream->stream->no_new_metadata_notified = true; + if (vstream->stream->closed) { + /* Release ownership for the viewer metadata stream. */ + viewer_stream_put(vstream); + } + } goto send_reply; } - /* first time, we open the metadata file */ - if (!vstream->stream_fd) { - char fullpath[PATH_MAX]; + if (vstream->stream->trace_chunk && + !lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream->trace_chunk)) { + /* A rotation has occurred on the relay stream. */ + DBG("Metadata relay stream and viewer chunk ids differ"); - ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name, - vstream->channel_name); - if (ret < 0) { - goto error; + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + vstream->stream->trace_chunk); + if (ret) { + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; } - ret = open(fullpath, O_RDONLY); + } + + if (conn->viewer_session->current_trace_chunk != + vstream->stream_file.trace_chunk) { + bool acquired_reference; + + DBG("Viewer session and viewer stream chunk differ: " + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + assert(acquired_reference); + vstream->stream_file.trace_chunk = + conn->viewer_session->current_trace_chunk; + viewer_stream_close_files(vstream); + } + + len = vstream->stream->metadata_received - vstream->metadata_sent; + + /* + * Either this is the first time the metadata file is read, or a + * rotation of the corresponding relay stream has occured. + */ + if (!vstream->stream_file.handle && len > 0) { + struct fs_handle *fs_handle; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + struct relay_stream *rstream = vstream->stream; + + ret = utils_stream_file_path(rstream->path_name, + rstream->channel_name, rstream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); if (ret < 0) { - PERROR("Relay opening metadata file"); goto error; } - vstream->stream_fd = stream_fd_create(ret); - if (!vstream->stream_fd) { - if (close(ret)) { - PERROR("close"); + + /* + * It is possible the the metadata file we are trying to open is + * missing if the stream has been closed (application exits with + * per-pid buffers) and a clear command has been performed. + */ + status = lttng_trace_chunk_open_fs_handle( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fs_handle, true); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); + len = 0; + if (vstream->stream->closed) { + viewer_stream_put(vstream); + } + goto send_reply; } + PERROR("Failed to open metadata file for viewer stream"); goto error; } + vstream->stream_file.handle = fs_handle; + + if (vstream->metadata_sent != 0) { + /* + * The client does not expect to receive any metadata + * it has received and metadata files in successive + * chunks must be a strict superset of one another. + * + * Skip the first `metadata_sent` bytes to ensure + * they are not sent a second time to the client. + * + * Baring a block layer error or an internal error, + * this seek should not fail as + * `vstream->stream->metadata_received` is reset when + * a relay stream is rotated. If this is reached, it is + * safe to assume that + * `metadata_received` > `metadata_sent`. + */ + const off_t seek_ret = fs_handle_seek(fs_handle, + vstream->metadata_sent, SEEK_SET); + + if (seek_ret < 0) { + PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64, + vstream->metadata_sent); + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; + } + } } reply.len = htobe64(len); @@ -1686,18 +2021,45 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); - if (read_len < len) { - PERROR("Relay reading metadata file"); + fd = fs_handle_get_fd(vstream->stream_file.handle); + if (fd < 0) { + ERR("Failed to restore viewer stream file system handle"); goto error; } - vstream->metadata_sent += read_len; - if (vstream->metadata_sent == vstream->stream->metadata_received - && vstream->stream->closed) { - /* Release ownership for the viewer metadata stream. */ - viewer_stream_put(vstream); + read_len = lttng_read(fd, data, len); + fs_handle_put_fd(vstream->stream_file.handle); + fd = -1; + if (read_len < len) { + if (read_len < 0) { + PERROR("Failed to read metadata file"); + goto error; + } else { + /* + * A clear has been performed which prevents the relay + * from sending `len` bytes of metadata. + * + * It is important not to send any metadata if we + * couldn't read all the available metadata in one shot: + * sending partial metadata can cause the client to + * attempt to parse an incomplete (incoherent) metadata + * stream, which would result in an error. + */ + const off_t seek_ret = fs_handle_seek( + vstream->stream_file.handle, -read_len, + SEEK_CUR); + + DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd", + len, read_len); + read_len = 0; + len = 0; + if (seek_ret < 0) { + PERROR("Failed to restore metadata file position after partial read"); + ret = -1; + goto error; + } + } } - + vstream->metadata_sent += read_len; reply.status = htobe32(LTTNG_VIEWER_METADATA_OK); goto send_reply; @@ -1724,7 +2086,7 @@ send_reply: } DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len, - be64toh(request.stream_id)); + (uint64_t) be64toh(request.stream_id)); DBG("Metadata sent"); @@ -1772,6 +2134,78 @@ end: return ret; } +/* + * Detach a viewer session. + * + * Return 0 on success or else a negative value. + */ +static +int viewer_detach_session(struct relay_connection *conn) +{ + int ret; + struct lttng_viewer_detach_session_response response; + struct lttng_viewer_detach_session_request request; + struct relay_session *session = NULL; + uint64_t viewer_session_to_close; + + DBG("Viewer detach session received"); + + assert(conn); + + health_code_update(); + + /* Receive the request from the connected client. */ + ret = recv_request(conn->sock, &request, sizeof(request)); + if (ret < 0) { + goto end; + } + viewer_session_to_close = be64toh(request.session_id); + + if (!conn->viewer_session) { + DBG("Client trying to detach before creating a live viewer session"); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR); + goto send_reply; + } + + health_code_update(); + + memset(&response, 0, sizeof(response)); + DBG("Detaching from session ID %" PRIu64, viewer_session_to_close); + + session = session_get_by_id(be64toh(request.session_id)); + if (!session) { + DBG("Relay session %" PRIu64 " not found", + (uint64_t) be64toh(request.session_id)); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK); + goto send_reply; + } + + ret = viewer_session_is_attached(conn->viewer_session, session); + if (ret != 1) { + DBG("Not attached to this session"); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR); + goto send_reply_put; + } + + viewer_session_close_one_session(conn->viewer_session, session); + response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_OK); + DBG("Session %" PRIu64 " detached.", viewer_session_to_close); + +send_reply_put: + session_put(session); + +send_reply: + health_code_update(); + ret = send_response(conn->sock, &response, sizeof(response)); + if (ret < 0) { + goto end; + } + health_code_update(); + ret = 0; + +end: + return ret; +} /* * live_relay_unknown_command: send -1 if received unknown command @@ -1833,6 +2267,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, case LTTNG_VIEWER_CREATE_SESSION: ret = viewer_create_session(conn); break; + case LTTNG_VIEWER_DETACH_SESSION: + ret = viewer_detach_session(conn); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); @@ -1852,7 +2289,8 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) (void) lttng_poll_del(events, pollfd); - ret = close(pollfd); + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1, + fd_tracker_util_close_fd, NULL); if (ret < 0) { ERR("Closing pollfd %d", pollfd); } @@ -1888,7 +2326,8 @@ void *thread_worker(void *data) goto viewer_connections_ht_error; } - ret = create_thread_poll_set(&events, 2); + ret = create_named_thread_poll_set(&events, 2, + "Live viewer worker thread epoll"); if (ret < 0) { goto error_poll_create; } @@ -1933,11 +2372,6 @@ restart: health_code_update(); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -1947,10 +2381,7 @@ restart: /* Inspect the relay conn pipe for new connection. */ if (pollfd == live_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay live pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(live_conn_pipe[0], @@ -1958,10 +2389,21 @@ restart: if (ret < 0) { goto error; } - lttng_poll_add(&events, conn->sock->fd, + ret = lttng_poll_add(&events, + conn->sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret) { + ERR("Failed to add new live connection file descriptor to poll set"); + goto error; + } connection_ht_add(viewer_connections_ht, conn); DBG("Connection socket %d added to poll", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay live pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* Connection activity. */ @@ -1972,11 +2414,7 @@ restart: continue; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - cleanup_connection_pollfd(&events, pollfd); - /* Put "create" ownership reference. */ - connection_put(conn); - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { @@ -1995,6 +2433,14 @@ restart: DBG("Viewer connection closed with %d", pollfd); } } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + cleanup_connection_pollfd(&events, pollfd); + /* Put "create" ownership reference. */ + connection_put(conn); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + connection_put(conn); + goto error; } /* Put local "get_by_sock" reference. */ connection_put(conn); @@ -2004,9 +2450,9 @@ restart: exit: error: - lttng_poll_clean(&events); + (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); - /* Cleanup reamaining connection object. */ + /* Cleanup remaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter, destroy_conn, @@ -2019,7 +2465,7 @@ error_poll_create: lttng_ht_destroy(viewer_connections_ht); viewer_connections_ht_error: /* Close relay conn pipes */ - utils_close_pipe(live_conn_pipe); + (void) fd_tracker_util_pipe_close(the_fd_tracker, live_conn_pipe); if (err) { DBG("Viewer worker thread exited with error"); } @@ -2043,7 +2489,8 @@ error_testpoint: */ static int create_conn_pipe(void) { - return utils_create_pipe_cloexec(live_conn_pipe); + return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, + "Live connection pipe", live_conn_pipe); } int relayd_live_join(void) @@ -2119,7 +2566,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the dispatcher thread */ - ret = pthread_create(&live_dispatcher_thread, NULL, + ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(), thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2129,7 +2576,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the worker thread */ - ret = pthread_create(&live_worker_thread, NULL, + ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, NULL); if (ret) { errno = ret; @@ -2139,7 +2586,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the listener thread */ - ret = pthread_create(&live_listener_thread, NULL, + ret = pthread_create(&live_listener_thread, default_pthread_attr(), thread_listener, (void *) NULL); if (ret) { errno = ret;