From 80e8027abb847655ebe43b2b5aec1a5141bb9668 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Wed, 15 Jan 2014 10:32:44 -0500 Subject: [PATCH] Fix: handle new streams in live mode in relayd Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/live.c | 176 ++++++++++++++++++++++++++-- src/bin/lttng-relayd/lttng-relayd.h | 9 ++ src/bin/lttng-relayd/lttng-viewer.h | 23 ++++ src/bin/lttng-relayd/main.c | 4 + 4 files changed, 205 insertions(+), 7 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 1a691a849..69e2eb3b2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -852,6 +852,166 @@ error: return ret; } +/* + * Send the viewer the list of current sessions. + */ +static +int viewer_get_new_streams(struct relay_command *cmd, + struct lttng_ht *sessions_ht) +{ + int ret, send_streams = 0; + uint32_t nb_streams = 0; + struct lttng_viewer_new_streams_request request; + struct lttng_viewer_new_streams_response response; + struct lttng_viewer_stream send_stream; + struct relay_stream *stream; + struct relay_viewer_stream *viewer_stream; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_session *session; + + assert(cmd); + assert(sessions_ht); + + DBG("Get new streams received"); + + if (cmd->version_check_done == 0) { + ERR("Trying to get streams before version check"); + ret = -1; + goto end_no_session; + } + + health_code_update(); + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0); + if (ret < 0 || ret != sizeof(request)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the command parameters."); + } + ret = -1; + goto error; + } + + health_code_update(); + + rcu_read_lock(); + lttng_ht_lookup(sessions_ht, + (void *)((unsigned long) be64toh(request.session_id)), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay session %" PRIu64 " not found", + be64toh(request.session_id)); + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + session = caa_container_of(node, struct relay_session, session_n); + if (cmd->session_id == session->id) { + /* We confirmed the viewer is asking for the same session. */ + send_streams = 1; + response.status = htobe32(VIEWER_NEW_STREAMS_OK); + } else { + send_streams = 0; + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + /* + * Fill the viewer_streams_ht to count the number of streams ready to be + * sent and avoid concurrency issues on the relay_streams_ht and don't rely + * on a total session stream count. + */ + pthread_mutex_lock(&session->viewer_ready_lock); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { + struct relay_viewer_stream *vstream; + + health_code_update(); + + /* + * Don't send stream if no ctf_trace, wrong session or if the stream is + * not ready for the viewer. + */ + if (stream->session != cmd->session || + !stream->ctf_trace || !stream->viewer_ready) { + continue; + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (!vstream) { + ret = init_viewer_stream(stream, 0); + if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); + goto end_unlock; + } + nb_streams++; + } else if (!vstream->sent_flag) { + nb_streams++; + } + } + pthread_mutex_unlock(&session->viewer_ready_lock); + + response.streams_count = htobe32(nb_streams); + +send_reply: + health_code_update(); + ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0); + if (ret < 0) { + ERR("Relay sending viewer attach response"); + goto end_unlock; + } + health_code_update(); + + /* + * Unknown or empty session, just return gracefully, the viewer knows what + * is happening. + */ + if (!send_streams || !nb_streams) { + ret = 0; + goto end_unlock; + } + + /* We should only be there if we have a session to attach to. */ + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, viewer_stream, + stream_n.node) { + health_code_update(); + + /* Don't send back if session does not match or already sent. */ + if (viewer_stream->session_id != cmd->session->id || + viewer_stream->sent_flag) { + continue; + } + + send_stream.id = htobe64(viewer_stream->stream_handle); + send_stream.ctf_trace_id = htobe64(viewer_stream->ctf_trace->id); + send_stream.metadata_flag = htobe32(viewer_stream->metadata_flag); + strncpy(send_stream.path_name, viewer_stream->path_name, + sizeof(send_stream.path_name)); + strncpy(send_stream.channel_name, viewer_stream->channel_name, + sizeof(send_stream.channel_name)); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &send_stream, + sizeof(send_stream), 0); + if (ret < 0) { + ERR("Relay sending stream %" PRIu64, viewer_stream->stream_handle); + goto end_unlock; + } + DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; + } + + ret = 0; + +end_unlock: + rcu_read_unlock(); +end_no_session: +error: + return ret; +} + /* * Send the viewer the list of current sessions. */ @@ -860,7 +1020,7 @@ int viewer_attach_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret, send_streams = 0; - uint32_t nb_streams = 0, nb_streams_ready = 0; + uint32_t nb_streams = 0; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct lttng_viewer_stream send_stream; @@ -954,6 +1114,7 @@ int viewer_attach_session(struct relay_command *cmd, * ready to be sent and avoid concurrency issues on the * relay_streams_ht and don't rely on a total session stream count. */ + pthread_mutex_lock(&session->viewer_ready_lock); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { struct relay_viewer_stream *vstream; @@ -967,7 +1128,6 @@ int viewer_attach_session(struct relay_command *cmd, if (stream->session != cmd->session) { continue; } - nb_streams++; /* * Don't send streams with no ctf_trace, they are not @@ -976,21 +1136,19 @@ int viewer_attach_session(struct relay_command *cmd, if (!stream->ctf_trace || !stream->viewer_ready) { continue; } - nb_streams_ready++; vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { ret = init_viewer_stream(stream, seek_last); if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); goto end_unlock; } } + nb_streams++; } + pthread_mutex_unlock(&session->viewer_ready_lock); - /* We must have the same amount of existing stream and ready stream. */ - if (nb_streams != nb_streams_ready) { - nb_streams = 0; - } response.streams_count = htobe32(nb_streams); } @@ -1042,6 +1200,7 @@ send_reply: goto end_unlock; } DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; } ret = 0; @@ -1719,6 +1878,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, case VIEWER_GET_METADATA: ret = viewer_get_metadata(cmd); break; + case VIEWER_GET_NEW_STREAMS: + ret = viewer_get_new_streams(cmd, sessions_ht); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(cmd); diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index ab1ccd410..b5e7c75fd 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -88,6 +88,13 @@ struct relay_session { * viewer-side if new streams got added since the last check. */ unsigned long new_streams; + + /* + * Used to synchronize the process where we flag every streams readiness + * for the viewer when the streams_sent message is received and the viewer + * process of sending those streams. + */ + pthread_mutex_t viewer_ready_lock; }; /* @@ -208,6 +215,8 @@ struct relay_viewer_stream { * it sets this flag to inform that it is a normal error. */ unsigned int abort_flag:1; + /* Indicates if this stream has been sent to a viewer client. */ + unsigned int sent_flag:1; }; /* diff --git a/src/bin/lttng-relayd/lttng-viewer.h b/src/bin/lttng-relayd/lttng-viewer.h index 1977a43b4..baa8964bf 100644 --- a/src/bin/lttng-relayd/lttng-viewer.h +++ b/src/bin/lttng-relayd/lttng-viewer.h @@ -44,6 +44,7 @@ enum lttng_viewer_command { VIEWER_GET_NEXT_INDEX = 4, VIEWER_GET_PACKET = 5, VIEWER_GET_METADATA = 6, + VIEWER_GET_NEW_STREAMS = 7, }; enum lttng_viewer_attach_return_code { @@ -88,6 +89,12 @@ enum lttng_viewer_seek { VIEWER_SEEK_LAST = 2, }; +enum lttng_viewer_new_streams_return_code { + VIEWER_NEW_STREAMS_OK = 1, /* If new streams are being sent. */ + VIEWER_NEW_STREAMS_NO_NEW = 2, /* If no new streams are available. */ + VIEWER_NEW_STREAMS_ERR = 3, /* Error. */ +}; + struct lttng_viewer_session { uint64_t id; uint32_t live_timer; @@ -198,4 +205,20 @@ struct lttng_viewer_metadata_packet { char data[]; } __attribute__((__packed__)); +/* + * VIEWER_GET_NEW_STREAMS payload. + */ +struct lttng_viewer_new_streams_request { + uint64_t session_id; +} __attribute__((__packed__)); + +struct lttng_viewer_new_streams_response { + /* enum lttng_viewer_new_streams_return_code */ + uint32_t status; + uint32_t streams_count; + /* struct lttng_viewer_stream */ + char stream_list[]; +} __attribute__((__packed__)); + + #endif /* LTTNG_VIEWER_H */ diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 380b4f833..730eaf203 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1160,6 +1160,7 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->sock = cmd->sock; session->minor = cmd->minor; session->major = cmd->major; + pthread_mutex_init(&session->viewer_ready_lock, NULL); cmd->session = session; reply.session_id = htobe64(session->id); @@ -1207,6 +1208,8 @@ void set_viewer_ready_flag(struct relay_command *cmd) { struct relay_stream_recv_handle *node, *tmp_node; + pthread_mutex_lock(&cmd->session->viewer_ready_lock); + cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { struct relay_stream *stream; @@ -1229,6 +1232,7 @@ void set_viewer_ready_flag(struct relay_command *cmd) free(node); } + pthread_mutex_unlock(&cmd->session->viewer_ready_lock); return; } -- 2.34.1