Fix: big relayd cleanup and refactor
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index d558b71487813aadf6206427c5a2f658589bbcb7..764d616a24e692c77f8e717c37ddcf3841d556fb 100644 (file)
@@ -59,6 +59,9 @@
 #include "health-relayd.h"
 #include "testpoint.h"
 #include "viewer-stream.h"
+#include "stream.h"
+#include "session.h"
+#include "ctf-trace.h"
 
 static struct lttng_uri *live_uri;
 
@@ -199,6 +202,8 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
 
        cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
                        stream_n.node) {
+               struct ctf_trace *ctf_trace;
+
                health_code_update();
 
                /* Ignore if not the same session. */
@@ -207,8 +212,12 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
                        continue;
                }
 
+               ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+                               vstream->path_name);
+               assert(ctf_trace);
+
                send_stream.id = htobe64(vstream->stream_handle);
-               send_stream.ctf_trace_id = htobe64(vstream->ctf_trace->id);
+               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
                send_stream.metadata_flag = htobe32(vstream->metadata_flag);
                strncpy(send_stream.path_name, vstream->path_name,
                                sizeof(send_stream.path_name));
@@ -244,8 +253,8 @@ int make_viewer_streams(struct relay_session *session,
                uint32_t *nb_created)
 {
        int ret;
-       struct relay_stream *stream;
        struct lttng_ht_iter iter;
+       struct ctf_trace *ctf_trace;
 
        assert(session);
 
@@ -262,45 +271,53 @@ int make_viewer_streams(struct relay_session *session,
         * Create viewer streams for relay streams that are ready to be used for a
         * the given session id only.
         */
-       cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
-                       stream_n.node) {
-               struct relay_viewer_stream *vstream;
+       rcu_read_lock();
+       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+                       node.node) {
+               struct relay_stream *stream;
 
                health_code_update();
 
-               if (stream->session->id != session->id ||
-                               !stream->ctf_trace || !stream->viewer_ready) {
-                       /*
-                        * Ignore stream from a different session. Don't create streams
-                        * with no ctf_trace or not ready for the viewer.
-                        */
+               if (ctf_trace->invalid_flag) {
                        continue;
                }
 
-               vstream = viewer_stream_find_by_id(stream->stream_handle);
-               if (!vstream) {
-                       vstream = viewer_stream_create(stream, seek_t);
+               cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+                       struct relay_viewer_stream *vstream;
+
+                       if (!stream->viewer_ready) {
+                               continue;
+                       }
+
+                       vstream = viewer_stream_find_by_id(stream->stream_handle);
                        if (!vstream) {
-                               ret = -1;
-                               goto error_unlock;
+                               vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+                               if (!vstream) {
+                                       ret = -1;
+                                       goto error_unlock;
+                               }
+                               /* Acquire reference to ctf_trace. */
+                               ctf_trace_get_ref(ctf_trace);
+
+                               if (nb_created) {
+                                       /* Update number of created stream counter. */
+                                       (*nb_created)++;
+                               }
+                       } else if (!vstream->sent_flag && nb_unsent) {
+                               /* Update number of unsent stream counter. */
+                               (*nb_unsent)++;
                        }
-                       if (nb_created) {
-                               /* Update number of created stream counter. */
-                               (*nb_created)++;
+                       /* Update number of total stream counter. */
+                       if (nb_total) {
+                               (*nb_total)++;
                        }
-               } else if (!vstream->sent_flag && nb_unsent) {
-                       /* Update number of unsent stream counter. */
-                       (*nb_unsent)++;
-               }
-               /* Update number of total stream counter. */
-               if (nb_total) {
-                       (*nb_total)++;
                }
        }
 
        ret = 0;
 
 error_unlock:
+       rcu_read_unlock();
        pthread_mutex_unlock(&session->viewer_ready_lock);
        return ret;
 }
@@ -331,7 +348,7 @@ void stop_threads(void)
 
        /* Stopping all threads */
        DBG("Terminating all live threads");
-       ret = notify_thread_pipe(thread_quit_pipe[1]);
+       ret = notify_thread_pipe(live_conn_pipe[1]);
        if (ret < 0) {
                ERR("write error on thread quit pipe");
        }
@@ -360,7 +377,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size)
        }
 
        /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
@@ -377,9 +394,9 @@ error:
  * Return 1 if it was triggered else 0;
  */
 static
-int check_thread_quit_pipe(int fd, uint32_t events)
+int check_live_conn_pipe(int fd, uint32_t events)
 {
-       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
+       if (fd == live_conn_pipe[0] && (events & LPOLLIN)) {
                return 1;
        }
 
@@ -497,7 +514,7 @@ restart:
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = check_live_conn_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -717,7 +734,6 @@ int viewer_connect(struct relay_command *cmd)
 
        health_code_update();
 
-
        ret = send_response(cmd->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end;
@@ -774,7 +790,7 @@ int viewer_list_sessions(struct relay_command *cmd,
                                sizeof(send_session.hostname));
                send_session.id = htobe64(session->id);
                send_session.live_timer = htobe32(session->live_timer);
-               send_session.clients = htobe32(session->viewer_attached);
+               send_session.clients = htobe32(session->viewer_refcount);
                send_session.streams = htobe32(session->stream_count);
 
                health_code_update();
@@ -907,8 +923,6 @@ int viewer_attach_session(struct relay_command *cmd,
        assert(cmd);
        assert(sessions_ht);
 
-       DBG("Attach session received");
-
        health_code_update();
 
        /* Receive the request from the connected client. */
@@ -927,21 +941,19 @@ int viewer_attach_session(struct relay_command *cmd,
                response.status = htobe32(VIEWER_ATTACH_UNK);
                goto send_reply;
        }
+       session_viewer_attach(session);
+       DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
 
-       if (cmd->session_id == session->id) {
-               /* Same viewer already attached, just send the stream list. */
-               send_streams = 1;
-               response.status = htobe32(VIEWER_ATTACH_OK);
-       } else if (session->viewer_attached != 0) {
+       if (uatomic_read(&session->viewer_refcount) > 1) {
                DBG("Already a viewer attached");
                response.status = htobe32(VIEWER_ATTACH_ALREADY);
+               session_viewer_detach(session);
                goto send_reply;
        } else if (session->live_timer == 0) {
                DBG("Not live session");
                response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
                goto send_reply;
        } else {
-               session->viewer_attached++;
                send_streams = 1;
                response.status = htobe32(VIEWER_ATTACH_OK);
                cmd->session_id = session->id;
@@ -1014,6 +1026,8 @@ int viewer_get_next_index(struct relay_command *cmd,
        struct ctf_packet_index packet_index;
        struct relay_viewer_stream *vstream;
        struct relay_stream *rstream;
+       struct ctf_trace *ctf_trace;
+       struct relay_session *session;
 
        assert(cmd);
        assert(sessions_ht);
@@ -1029,12 +1043,21 @@ int viewer_get_next_index(struct relay_command *cmd,
        health_code_update();
 
        rcu_read_lock();
+       session = session_find_by_id(sessions_ht, cmd->session_id);
+       if (!session) {
+               ret = -1;
+               goto end_unlock;
+       }
+
        vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
                ret = -1;
                goto end_unlock;
        }
 
+       ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
+       assert(ctf_trace);
+
        memset(&viewer_index, 0, sizeof(viewer_index));
 
        /*
@@ -1063,8 +1086,10 @@ int viewer_get_next_index(struct relay_command *cmd,
                vstream->index_read_fd = ret;
        }
 
-       rstream = relay_stream_find_by_id(vstream->stream_handle);
-       if (rstream) {
+       rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
+       assert(rstream);
+
+       if (!rstream->close_flag) {
                if (vstream->abort_flag) {
                        /* Rotate on abort (overwrite). */
                        DBG("Viewer rotate because of overwrite");
@@ -1074,10 +1099,12 @@ int viewer_get_next_index(struct relay_command *cmd,
                        } else if (ret == 1) {
                                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
-                               viewer_stream_destroy(vstream);
+                               viewer_stream_destroy(ctf_trace, vstream);
                                goto send_reply;
                        }
+                       /* ret == 0 means successful so we continue. */
                }
+
                pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
                if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
                        if (rstream->beacon_ts_end != -1ULL &&
@@ -1086,13 +1113,13 @@ int viewer_get_next_index(struct relay_command *cmd,
                                viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                goto send_reply;
-                       /*
-                        * Reader and writer are working in the same tracefile, so we care
-                        * about the number of index received and sent. Otherwise, we read
-                        * up to EOF.
-                        */
                        } else if (rstream->total_index_received <= vstream->last_sent_index
                                        && !vstream->close_write_flag) {
+                               /*
+                                * Reader and writer are working in the same tracefile, so we care
+                                * about the number of index received and sent. Otherwise, we read
+                                * up to EOF.
+                                */
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                /* No new index to send, retry later. */
                                viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
@@ -1100,20 +1127,19 @@ int viewer_get_next_index(struct relay_command *cmd,
                        }
                }
                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
-       } else if (!rstream && vstream->close_write_flag &&
+       } else if (rstream->close_flag && vstream->close_write_flag &&
                        vstream->total_index_received == vstream->last_sent_index) {
                /* Last index sent and current tracefile closed in write */
                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
                viewer_stream_delete(vstream);
-               viewer_stream_destroy(vstream);
+               viewer_stream_destroy(ctf_trace, vstream);
                goto send_reply;
        } else {
                vstream->close_write_flag = 1;
        }
 
-       if (!vstream->ctf_trace->metadata_received ||
-                       vstream->ctf_trace->metadata_received >
-                       vstream->ctf_trace->metadata_sent) {
+       if (!ctf_trace->metadata_received ||
+                       ctf_trace->metadata_received > ctf_trace->metadata_sent) {
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
@@ -1127,8 +1153,7 @@ int viewer_get_next_index(struct relay_command *cmd,
        pthread_mutex_lock(&vstream->overwrite_lock);
        if (vstream->abort_flag) {
                /*
-                * The file is being overwritten by the writer, we cannot
-                * use it.
+                * The file is being overwritten by the writer, we cannot * use it.
                 */
                viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
                pthread_mutex_unlock(&vstream->overwrite_lock);
@@ -1138,7 +1163,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                } else if (ret == 1) {
                        viewer_index.status = htobe32(VIEWER_INDEX_HUP);
                        viewer_stream_delete(vstream);
-                       viewer_stream_destroy(vstream);
+                       viewer_stream_destroy(ctf_trace, vstream);
                        goto send_reply;
                }
                goto send_reply;
@@ -1160,7 +1185,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                        } else if (ret == 1) {
                                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
-                               viewer_stream_destroy(vstream);
+                               viewer_stream_destroy(ctf_trace, vstream);
                                goto send_reply;
                        }
                } else {
@@ -1220,6 +1245,7 @@ int viewer_get_packet(struct relay_command *cmd,
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *stream;
+       struct ctf_trace *ctf_trace;
 
        assert(cmd);
 
@@ -1241,7 +1267,10 @@ int viewer_get_packet(struct relay_command *cmd,
        if (!stream) {
                goto error;
        }
-       assert(stream->ctf_trace);
+
+       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+                       stream->path_name);
+       assert(ctf_trace);
 
        /*
         * First time we read this stream, we need open the tracefile, we should
@@ -1270,9 +1299,8 @@ int viewer_get_packet(struct relay_command *cmd,
                stream->read_fd = ret;
        }
 
-       if (!stream->ctf_trace->metadata_received ||
-                       stream->ctf_trace->metadata_received >
-                       stream->ctf_trace->metadata_sent) {
+       if (!ctf_trace->metadata_received ||
+                       ctf_trace->metadata_received > ctf_trace->metadata_sent) {
                reply.status = htobe32(VIEWER_GET_PACKET_ERR);
                reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
                goto send_reply;
@@ -1377,6 +1405,7 @@ int viewer_get_metadata(struct relay_command *cmd)
        struct lttng_viewer_get_metadata request;
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *stream;
+       struct ctf_trace *ctf_trace;
 
        assert(cmd);
 
@@ -1396,12 +1425,13 @@ int viewer_get_metadata(struct relay_command *cmd)
                ERR("Invalid metadata stream");
                goto error;
        }
-       assert(stream->ctf_trace);
-       assert(stream->ctf_trace->metadata_sent <=
-                       stream->ctf_trace->metadata_received);
 
-       len = stream->ctf_trace->metadata_received -
-               stream->ctf_trace->metadata_sent;
+       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+                       stream->path_name);
+       assert(ctf_trace);
+       assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
+
+       len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
        if (len == 0) {
                reply.status = htobe32(VIEWER_NO_NEW_METADATA);
                goto send_reply;
@@ -1436,7 +1466,7 @@ int viewer_get_metadata(struct relay_command *cmd)
                PERROR("Relay reading metadata file");
                goto error;
        }
-       stream->ctf_trace->metadata_sent += read_len;
+       ctf_trace->metadata_sent += read_len;
        reply.status = htobe32(VIEWER_METADATA_OK);
        goto send_reply;
 
@@ -1605,10 +1635,6 @@ void deferred_free_connection(struct rcu_head *head)
        struct relay_command *relay_connection =
                caa_container_of(head, struct relay_command, rcu_node);
 
-       if (relay_connection->session &&
-                       relay_connection->session->viewer_attached > 0) {
-               relay_connection->session->viewer_attached--;
-       }
        lttcomm_destroy_sock(relay_connection->sock);
        free(relay_connection);
 }
@@ -1616,39 +1642,53 @@ void deferred_free_connection(struct rcu_head *head)
 /*
  * Delete all streams for a specific session ID.
  */
-static
-void viewer_del_streams(uint64_t session_id)
+static void destroy_viewer_streams_by_session(struct relay_session *session)
 {
        struct relay_viewer_stream *stream;
        struct lttng_ht_iter iter;
 
+       assert(session);
+
        rcu_read_lock();
        cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
                        stream_n.node) {
-               health_code_update();
+               struct ctf_trace *ctf_trace;
 
-               if (stream->session_id != session_id) {
+               health_code_update();
+               if (stream->session_id != session->id) {
                        continue;
                }
 
+               ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+                               stream->path_name);
+               assert(ctf_trace);
+
                viewer_stream_delete(stream);
-               assert(stream->ctf_trace);
 
                if (stream->metadata_flag) {
-                       /*
-                        * The metadata viewer stream is destroyed once the refcount on the
-                        * ctf trace goes to 0 in the destroy stream function thus there is
-                        * no explicit call to that function here.
-                        */
-                       stream->ctf_trace->metadata_sent = 0;
-                       stream->ctf_trace->viewer_metadata_stream = NULL;
-               } else {
-                       viewer_stream_destroy(stream);
+                       ctf_trace->metadata_sent = 0;
+                       ctf_trace->viewer_metadata_stream = NULL;
                }
+
+               viewer_stream_destroy(ctf_trace, stream);
        }
        rcu_read_unlock();
 }
 
+static void try_destroy_streams(struct relay_session *session)
+{
+       struct ctf_trace *ctf_trace;
+       struct lttng_ht_iter iter;
+
+       assert(session);
+
+       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+                       node.node) {
+               /* Attempt to destroy the ctf trace of that session. */
+               ctf_trace_try_destroy(session, ctf_trace);
+       }
+}
+
 /*
  * Delete and free a connection.
  *
@@ -1656,21 +1696,35 @@ void viewer_del_streams(uint64_t session_id)
  */
 static
 void del_connection(struct lttng_ht *relay_connections_ht,
-               struct lttng_ht_iter *iter, struct relay_command *relay_connection)
+               struct lttng_ht_iter *iter, struct relay_command *relay_connection,
+               struct lttng_ht *sessions_ht)
 {
        int ret;
+       struct relay_session *session;
 
        assert(relay_connections_ht);
        assert(iter);
        assert(relay_connection);
+       assert(sessions_ht);
 
        DBG("Cleaning connection of session ID %" PRIu64,
                        relay_connection->session_id);
 
+       rcu_read_lock();
        ret = lttng_ht_del(relay_connections_ht, iter);
        assert(!ret);
 
-       viewer_del_streams(relay_connection->session_id);
+       session = session_find_by_id(sessions_ht, relay_connection->session_id);
+       if (session) {
+               /*
+                * Very important that this is done before destroying the session so we
+                * can put back every viewer stream reference from the ctf_trace.
+                */
+               destroy_viewer_streams_by_session(session);
+               try_destroy_streams(session);
+               session_viewer_try_destroy(sessions_ht, session);
+       }
+       rcu_read_unlock();
 
        call_rcu(&relay_connection->rcu_node, deferred_free_connection);
 }
@@ -1754,7 +1808,7 @@ restart:
                        health_code_update();
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = check_live_conn_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -1789,12 +1843,12 @@ restart:
                                if (revents & (LPOLLERR)) {
                                        cleanup_poll_connection(&events, pollfd);
                                        del_connection(relay_connections_ht, &iter,
-                                                       relay_connection);
+                                                       relay_connection, relay_ctx->sessions_ht);
                                } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
                                        DBG("Viewer socket %d hung up", pollfd);
                                        cleanup_poll_connection(&events, pollfd);
                                        del_connection(relay_connections_ht, &iter,
-                                                       relay_connection);
+                                                       relay_connection, relay_ctx->sessions_ht);
                                } else if (revents & LPOLLIN) {
                                        ret = relay_connection->sock->ops->recvmsg(
                                                        relay_connection->sock, &recv_hdr,
@@ -1804,7 +1858,7 @@ restart:
                                        if (ret <= 0) {
                                                cleanup_poll_connection(&events, pollfd);
                                                del_connection(relay_connections_ht, &iter,
-                                                               relay_connection);
+                                                               relay_connection, relay_ctx->sessions_ht);
                                                DBG("Viewer control connection closed with %d",
                                                                pollfd);
                                        } else {
@@ -1819,7 +1873,7 @@ restart:
                                                        /* Clear the session on error. */
                                                        cleanup_poll_connection(&events, pollfd);
                                                        del_connection(relay_connections_ht, &iter,
-                                                                       relay_connection);
+                                                                       relay_connection, relay_ctx->sessions_ht);
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
@@ -1845,7 +1899,8 @@ error:
 
                relay_connection = caa_container_of(node, struct relay_command,
                                sock_n);
-               del_connection(relay_connections_ht, &iter, relay_connection);
+               del_connection(relay_connections_ht, &iter, relay_connection,
+                               relay_ctx->sessions_ht);
        }
        rcu_read_unlock();
 error_poll_create:
This page took 0.031899 seconds and 5 git commands to generate.