X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=d5f04582419dffec1466c60c4de449cf34948a03;hp=a64d69b02e37f1f4413957f8e7e6e514d34d1e3e;hb=9a9c8637bbfb1b12b8302e03d5bc0453672b4d06;hpb=ab5be9fa2eb5ba9600a82cd18fd3cfcbac69169a diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index a64d69b02..d5f045824 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -192,7 +192,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag) { ssize_t ret; - struct lttng_viewer_stream send_stream; struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; @@ -201,6 +200,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; health_code_update(); @@ -439,6 +439,10 @@ int create_named_thread_poll_set(struct lttng_poll_event *events, ret = fd_tracker_util_poll_create(the_fd_tracker, name, events, 1, LTTNG_CLOEXEC); + if (ret) { + PERROR("Failed to create \"%s\" poll file descriptor", name); + goto error; + } /* Add quit pipe */ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); @@ -561,7 +565,11 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, (const char **) (formated_name ? &formated_name : NULL), 1, create_sock, sock); - free(formated_name); + if (ret) { + PERROR("Failed to create \"%s\" socket", + formated_name ?: "Unknown"); + goto error; + } DBG("Listening on %s socket %d", name, sock->fd); ret = sock->ops->bind(sock); @@ -576,12 +584,14 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) } + free(formated_name); return sock; error: if (sock) { lttcomm_destroy_sock(sock); } + free(formated_name); return NULL; } @@ -598,6 +608,7 @@ void *thread_listener(void *data) DBG("[thread] Relay live listener started"); + rcu_register_thread(); health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_LISTENER); health_code_update(); @@ -739,6 +750,7 @@ error_sock_control: DBG("Live viewer listener thread exited with error"); } health_unregister(health_relayd); + rcu_unregister_thread(); DBG("Live viewer listener thread cleanup complete"); if (lttng_relay_stop_threads()) { ERR("Error stopping threads"); @@ -1530,42 +1542,18 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - if (rstream->trace_chunk) { - uint64_t rchunk_id, vchunk_id; + if (rstream->trace_chunk && !lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + rstream->trace_chunk)) { + DBG("Relay stream and viewer chunk ids differ"); - /* - * If the relay stream is not yet closed, ensure the viewer - * chunk matches the relay chunk after clear. - */ - if (lttng_trace_chunk_get_id(rstream->trace_chunk, - &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; - } - if (lttng_trace_chunk_get_id( - conn->viewer_session->current_trace_chunk, - &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + rstream->trace_chunk); + if (ret) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } - - if (rchunk_id != vchunk_id) { - DBG("Relay and viewer chunk ids differ: " - "rchunk_id %" PRIu64 " vchunk_id %" PRIu64, - rchunk_id, vchunk_id); - - lttng_trace_chunk_put( - conn->viewer_session->current_trace_chunk); - conn->viewer_session->current_trace_chunk = NULL; - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - rstream->trace_chunk); - if (ret) { - viewer_index.status = - htobe32(LTTNG_VIEWER_INDEX_ERR); - goto send_reply; - } - } } if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk) { @@ -1924,10 +1912,45 @@ int viewer_get_metadata(struct relay_connection *conn) goto send_reply; } + if (vstream->stream->trace_chunk && + !lttng_trace_chunk_ids_equal( + conn->viewer_session->current_trace_chunk, + vstream->stream->trace_chunk)) { + /* A rotation has occurred on the relay stream. */ + DBG("Metadata relay stream and viewer chunk ids differ"); + + ret = viewer_session_set_trace_chunk_copy( + conn->viewer_session, + vstream->stream->trace_chunk); + if (ret) { + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; + } + } + + if (conn->viewer_session->current_trace_chunk != + vstream->stream_file.trace_chunk) { + bool acquired_reference; + + DBG("Viewer session and viewer stream chunk differ: " + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + assert(acquired_reference); + vstream->stream_file.trace_chunk = + conn->viewer_session->current_trace_chunk; + viewer_stream_close_files(vstream); + } + len = vstream->stream->metadata_received - vstream->metadata_sent; - /* first time, we open the metadata file */ - if (!vstream->stream_file.handle) { + /* + * Either this is the first time the metadata file is read, or a + * rotation of the corresponding relay stream has occured. + */ + if (!vstream->stream_file.handle && len > 0) { struct fs_handle *fs_handle; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; @@ -1962,6 +1985,33 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } vstream->stream_file.handle = fs_handle; + + if (vstream->metadata_sent != 0) { + /* + * The client does not expect to receive any metadata + * it has received and metadata files in successive + * chunks must be a strict superset of one another. + * + * Skip the first `metadata_sent` bytes to ensure + * they are not sent a second time to the client. + * + * Baring a block layer error or an internal error, + * this seek should not fail as + * `vstream->stream->metadata_received` is reset when + * a relay stream is rotated. If this is reached, it is + * safe to assume that + * `metadata_received` > `metadata_sent`. + */ + const off_t seek_ret = fs_handle_seek(fs_handle, + vstream->metadata_sent, SEEK_SET); + + if (seek_ret < 0) { + PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64, + vstream->metadata_sent); + reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); + goto send_reply; + } + } } reply.len = htobe64(len); @@ -1980,8 +2030,34 @@ int viewer_get_metadata(struct relay_connection *conn) fs_handle_put_fd(vstream->stream_file.handle); fd = -1; if (read_len < len) { - PERROR("Relay reading metadata file"); - goto error; + if (read_len < 0) { + PERROR("Failed to read metadata file"); + goto error; + } else { + /* + * A clear has been performed which prevents the relay + * from sending `len` bytes of metadata. + * + * It is important not to send any metadata if we + * couldn't read all the available metadata in one shot: + * sending partial metadata can cause the client to + * attempt to parse an incomplete (incoherent) metadata + * stream, which would result in an error. + */ + const off_t seek_ret = fs_handle_seek( + vstream->stream_file.handle, -read_len, + SEEK_CUR); + + DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd", + len, read_len); + read_len = 0; + len = 0; + if (seek_ret < 0) { + PERROR("Failed to restore metadata file position after partial read"); + ret = -1; + goto error; + } + } } vstream->metadata_sent += read_len; reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);