Fix: allow attach command to multiple sessions
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index b3b7ee427268ff6d24169a037b44eab134eddef9..5b9e4248c0e779deadc7b604ace4d928f7c9634f 100644 (file)
@@ -809,6 +809,32 @@ end:
        return ret;
 }
 
+/*
+ * Check if a connection is attached to a session.
+ * Return 1 if attached, 0 if not attached, a negative value on error.
+ */
+static
+int session_attached(struct relay_connection *conn, uint64_t session_id)
+{
+       struct relay_session *session;
+       int found = 0;
+
+       if (!conn->viewer_session) {
+               goto end;
+       }
+       cds_list_for_each_entry(session,
+                       &conn->viewer_session->sessions_head,
+                       viewer_session_list) {
+               if (session->id == session_id) {
+                       found = 1;
+                       goto end;
+               }
+       }
+
+end:
+       return found;
+}
+
 /*
  * Send the viewer the list of current sessions.
  */
@@ -820,6 +846,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session;
+       uint64_t session_id;
 
        assert(conn);
 
@@ -832,29 +859,27 @@ int viewer_get_new_streams(struct relay_connection *conn)
        if (ret < 0) {
                goto error;
        }
+       session_id = be64toh(request.session_id);
 
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(conn->sessions_ht,
-                       be64toh(request.session_id));
+       session = session_find_by_id(conn->sessions_ht, session_id);
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
+               DBG("Relay session %" PRIu64 " not found", session_id);
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
-       if (conn->session_id == session->id) {
-               /* We confirmed the viewer is asking for the same session. */
-               send_streams = 1;
-               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
-       } else {
+       if (!session_attached(conn, session_id)) {
                send_streams = 0;
                response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
+       send_streams = 1;
+       response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
+
        if (!send_streams) {
                goto send_reply;
        }
@@ -868,6 +893,23 @@ int viewer_get_new_streams(struct relay_connection *conn)
        nb_streams = nb_created + nb_unsent;
        response.streams_count = htobe32(nb_streams);
 
+       /*
+        * If the session is closed and we have no new streams to send,
+        * it means that the viewer has already received the whole trace
+        * for this session and should now close it.
+        */
+       if (nb_streams == 0 && session->close_flag) {
+               send_streams = 0;
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+               /*
+                * Remove the session from the attached list of the connection
+                * and try to destroy it.
+                */
+               cds_list_del(&session->viewer_session_list);
+               session_viewer_try_destroy(conn->sessions_ht, session);
+               goto send_reply;
+       }
+
 send_reply:
        health_code_update();
        ret = send_response(conn->sock, &response, sizeof(response));
@@ -926,6 +968,12 @@ int viewer_attach_session(struct relay_connection *conn)
 
        health_code_update();
 
+       if (!conn->viewer_session) {
+               DBG("Client trying to attach before creating a live viewer session");
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+               goto send_reply;
+       }
+
        rcu_read_lock();
        session = session_find_by_id(conn->sessions_ht,
                        be64toh(request.session_id));
@@ -950,8 +998,8 @@ int viewer_attach_session(struct relay_connection *conn)
        } else {
                send_streams = 1;
                response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
-               conn->session_id = session->id;
-               conn->session = session;
+               cds_list_add(&session->viewer_session_list,
+                               &conn->viewer_session->sessions_head);
        }
 
        switch (be32toh(request.seek)) {
@@ -1035,14 +1083,14 @@ int viewer_get_next_index(struct relay_connection *conn)
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(conn->sessions_ht, conn->session_id);
-       if (!session) {
+       vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
+       if (!vstream) {
                ret = -1;
                goto end_unlock;
        }
 
-       vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
-       if (!vstream) {
+       session = session_find_by_id(conn->sessions_ht, vstream->session_id);
+       if (!session) {
                ret = -1;
                goto end_unlock;
        }
@@ -1236,6 +1284,7 @@ int viewer_get_packet(struct relay_connection *conn)
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *stream;
+       struct relay_session *session;
        struct ctf_trace *ctf_trace;
 
        assert(conn);
@@ -1259,7 +1308,13 @@ int viewer_get_packet(struct relay_connection *conn)
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht,
+       session = session_find_by_id(conn->sessions_ht, stream->session_id);
+       if (!session) {
+               ret = -1;
+               goto error;
+       }
+
+       ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
 
@@ -1397,6 +1452,7 @@ int viewer_get_metadata(struct relay_connection *conn)
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *stream;
        struct ctf_trace *ctf_trace;
+       struct relay_session *session;
 
        assert(conn);
 
@@ -1417,7 +1473,13 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht,
+       session = session_find_by_id(conn->sessions_ht, stream->session_id);
+       if (!session) {
+               ret = -1;
+               goto error;
+       }
+
+       ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
        assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
@@ -1491,6 +1553,42 @@ end:
        return ret;
 }
 
+/*
+ * Create a viewer session.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static
+int viewer_create_session(struct relay_connection *conn)
+{
+       int ret;
+       struct lttng_viewer_create_session_response resp;
+
+       DBG("Viewer create session received");
+
+       resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
+       conn->viewer_session = zmalloc(sizeof(conn->viewer_session));
+       if (!conn->viewer_session) {
+               ERR("Allocation viewer session");
+               resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
+               goto send_reply;
+       }
+       CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
+
+send_reply:
+       health_code_update();
+       ret = send_response(conn->sock, &resp, sizeof(resp));
+       if (ret < 0) {
+               goto end;
+       }
+       health_code_update();
+       ret = 0;
+
+end:
+       return ret;
+}
+
+
 /*
  * live_relay_unknown_command: send -1 if received unknown command
  */
@@ -1550,6 +1648,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
        case LTTNG_VIEWER_GET_NEW_STREAMS:
                ret = viewer_get_new_streams(conn);
                break;
+       case LTTNG_VIEWER_CREATE_SESSION:
+               ret = viewer_create_session(conn);
+               break;
        default:
                ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
                live_relay_unknown_command(conn);
@@ -1634,28 +1735,34 @@ static void try_destroy_streams(struct relay_session *session)
 static void destroy_connection(struct lttng_ht *relay_connections_ht,
                struct relay_connection *conn)
 {
-       struct relay_session *session;
+       struct relay_session *session, *tmp_session;
 
        assert(relay_connections_ht);
        assert(conn);
 
-       DBG("Cleaning connection of session ID %" PRIu64, conn->session_id);
-
        connection_delete(relay_connections_ht, conn);
 
+       if (!conn->viewer_session) {
+               goto end;
+       }
+
        rcu_read_lock();
-       session = session_find_by_id(conn->sessions_ht, conn->session_id);
-       if (session) {
+       cds_list_for_each_entry_safe(session, tmp_session,
+                       &conn->viewer_session->sessions_head,
+                       viewer_session_list) {
+               DBG("Cleaning connection of session ID %" PRIu64, session->id);
                /*
                 * Very important that this is done before destroying the session so we
                 * can put back every viewer stream reference from the ctf_trace.
                 */
                destroy_viewer_streams_by_session(session);
                try_destroy_streams(session);
+               cds_list_del(&session->viewer_session_list);
                session_viewer_try_destroy(conn->sessions_ht, session);
        }
        rcu_read_unlock();
 
+end:
        connection_destroy(conn);
 }
 
This page took 0.029298 seconds and 5 git commands to generate.