Fix: force the client to create a viewer session before attaching
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 764d616a24e692c77f8e717c37ddcf3841d556fb..b2e8e7c8bb1305b07c121172ecb1d2e760339348 100644 (file)
@@ -62,6 +62,7 @@
 #include "stream.h"
 #include "session.h"
 #include "ctf-trace.h"
+#include "connection.h"
 
 static struct lttng_uri *live_uri;
 
@@ -69,7 +70,7 @@ static struct lttng_uri *live_uri;
  * This pipe is used to inform the worker thread that a command is queued and
  * ready to be processed.
  */
-static int live_relay_cmd_pipe[2] = { -1, -1 };
+static int live_conn_pipe[2] = { -1, -1 };
 
 /* Shared between threads */
 static int live_dispatch_thread_exit;
@@ -84,7 +85,7 @@ static pthread_t live_worker_thread;
  * The live_thread_listener and live_thread_dispatcher communicate with this
  * queue.
  */
-static struct relay_cmd_queue viewer_cmd_queue;
+static struct relay_conn_queue viewer_conn_queue;
 
 static uint64_t last_relay_viewer_session_id;
 
@@ -355,7 +356,7 @@ void stop_threads(void)
 
        /* Dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
-       futex_nto1_wake(&viewer_cmd_queue.futex);
+       futex_nto1_wake(&viewer_conn_queue.futex);
 }
 
 /*
@@ -451,7 +452,6 @@ static
 void *thread_listener(void *data)
 {
        int i, ret, pollfd, err = -1;
-       int val = 1;
        uint32_t revents, nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *live_control_sock;
@@ -528,43 +528,41 @@ restart:
                                 * Get allocated in this thread, enqueued to a global queue,
                                 * dequeued and freed in the worker thread.
                                 */
-                               struct relay_command *relay_cmd;
+                               int val = 1;
+                               struct relay_connection *new_conn;
                                struct lttcomm_sock *newsock;
 
-                               relay_cmd = zmalloc(sizeof(*relay_cmd));
-                               if (!relay_cmd) {
-                                       PERROR("relay command zmalloc");
+                               new_conn = connection_create();
+                               if (!new_conn) {
                                        goto error;
                                }
 
-                               assert(pollfd == live_control_sock->fd);
                                newsock = live_control_sock->ops->accept(live_control_sock);
                                if (!newsock) {
                                        PERROR("accepting control sock");
-                                       free(relay_cmd);
+                                       connection_free(new_conn);
                                        goto error;
                                }
                                DBG("Relay viewer connection accepted socket %d", newsock->fd);
+
                                ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
-                                               sizeof(int));
+                                               sizeof(val));
                                if (ret < 0) {
                                        PERROR("setsockopt inet");
                                        lttcomm_destroy_sock(newsock);
-                                       free(relay_cmd);
+                                       connection_free(new_conn);
                                        goto error;
                                }
-                               relay_cmd->sock = newsock;
+                               new_conn->sock = newsock;
 
-                               /*
-                                * Lock free enqueue the request.
-                                */
-                               cds_wfq_enqueue(&viewer_cmd_queue.queue, &relay_cmd->node);
+                               /* Enqueue request for the dispatcher thread. */
+                               cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
 
                                /*
-                                * Wake the dispatch queue futex. Implicit memory
-                                * barrier with the exchange in cds_wfq_enqueue.
+                                * Wake the dispatch queue futex. Implicit memory barrier with
+                                * the exchange in cds_wfq_enqueue.
                                 */
-                               futex_nto1_wake(&viewer_cmd_queue.futex);
+                               futex_nto1_wake(&viewer_conn_queue.futex);
                        }
                }
        }
@@ -602,7 +600,7 @@ void *thread_dispatcher(void *data)
        int err = -1;
        ssize_t ret;
        struct cds_wfq_node *node;
-       struct relay_command *relay_cmd = NULL;
+       struct relay_connection *conn = NULL;
 
        DBG("[thread] Live viewer relay dispatcher started");
 
@@ -618,41 +616,39 @@ void *thread_dispatcher(void *data)
                health_code_update();
 
                /* Atomically prepare the queue futex */
-               futex_nto1_prepare(&viewer_cmd_queue.futex);
+               futex_nto1_prepare(&viewer_conn_queue.futex);
 
                do {
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&viewer_cmd_queue.queue);
+                       node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the live-viewer "
                                                "relay command queue");
                                /* Continue thread execution */
                                break;
                        }
-
-                       relay_cmd = caa_container_of(node, struct relay_command, node);
+                       conn = caa_container_of(node, struct relay_connection, qnode);
                        DBG("Dispatching viewer request waiting on sock %d",
-                                       relay_cmd->sock->fd);
+                                       conn->sock->fd);
 
                        /*
                         * Inform worker thread of the new request. This call is blocking
                         * so we can be assured that the data will be read at some point in
                         * time or wait to the end of the world :)
                         */
-                       ret = lttng_write(live_relay_cmd_pipe[1], relay_cmd,
-                                       sizeof(*relay_cmd));
-                       free(relay_cmd);
-                       if (ret < sizeof(struct relay_command)) {
-                               PERROR("write cmd pipe");
+                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
+                       if (ret < 0) {
+                               PERROR("write conn pipe");
+                               connection_destroy(conn);
                                goto error;
                        }
                } while (node != NULL);
 
                /* Futex wait on queue. Blocking call on futex() */
                health_poll_entry();
-               futex_nto1_wait(&viewer_cmd_queue.futex);
+               futex_nto1_wait(&viewer_conn_queue.futex);
                health_poll_exit();
        }
 
@@ -677,20 +673,20 @@ error_testpoint:
  * Return 0 on success or else negative value.
  */
 static
-int viewer_connect(struct relay_command *cmd)
+int viewer_connect(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_connect reply, msg;
 
-       assert(cmd);
+       assert(conn);
 
-       cmd->version_check_done = 1;
+       conn->version_check_done = 1;
 
        health_code_update();
 
        DBG("Viewer is establishing a connection to the relayd.");
 
-       ret = recv_request(cmd->sock, &msg, sizeof(msg));
+       ret = recv_request(conn->sock, &msg, sizeof(msg));
        if (ret < 0) {
                goto end;
        }
@@ -708,18 +704,18 @@ int viewer_connect(struct relay_command *cmd)
                goto end;
        }
 
-       cmd->major = reply.major;
+       conn->major = reply.major;
        /* We adapt to the lowest compatible version */
        if (reply.minor <= be32toh(msg.minor)) {
-               cmd->minor = reply.minor;
+               conn->minor = reply.minor;
        } else {
-               cmd->minor = be32toh(msg.minor);
+               conn->minor = be32toh(msg.minor);
        }
 
-       if (be32toh(msg.type) == VIEWER_CLIENT_COMMAND) {
-               cmd->type = RELAY_VIEWER_COMMAND;
-       } else if (be32toh(msg.type) == VIEWER_CLIENT_NOTIFICATION) {
-               cmd->type = RELAY_VIEWER_NOTIFICATION;
+       if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
+               conn->type = RELAY_VIEWER_COMMAND;
+       } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
+               conn->type = RELAY_VIEWER_NOTIFICATION;
        } else {
                ERR("Unknown connection type : %u", be32toh(msg.type));
                ret = -1;
@@ -728,20 +724,20 @@ int viewer_connect(struct relay_command *cmd)
 
        reply.major = htobe32(reply.major);
        reply.minor = htobe32(reply.minor);
-       if (cmd->type == RELAY_VIEWER_COMMAND) {
+       if (conn->type == RELAY_VIEWER_COMMAND) {
                reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
        }
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end;
        }
 
        health_code_update();
 
-       DBG("Version check done using protocol %u.%u", cmd->major, cmd->minor);
+       DBG("Version check done using protocol %u.%u", conn->major, conn->minor);
        ret = 0;
 
 end:
@@ -754,8 +750,7 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_list_sessions(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_list_sessions(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_list_sessions session_list;
@@ -768,19 +763,20 @@ int viewer_list_sessions(struct relay_command *cmd,
        DBG("List sessions received");
 
        rcu_read_lock();
-       cds_lfht_count_nodes(sessions_ht->ht, &approx_before, &count, &approx_after);
+       cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
+                       &approx_after);
        session_list.sessions_count = htobe32(count);
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &session_list, sizeof(session_list));
+       ret = send_response(conn->sock, &session_list, sizeof(session_list));
        if (ret < 0) {
                goto end_unlock;
        }
 
        health_code_update();
 
-       cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
+       cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
                        session_n.node) {
                health_code_update();
 
@@ -795,7 +791,7 @@ int viewer_list_sessions(struct relay_command *cmd,
 
                health_code_update();
 
-               ret = send_response(cmd->sock, &send_session, sizeof(send_session));
+               ret = send_response(conn->sock, &send_session, sizeof(send_session));
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -817,8 +813,7 @@ end:
  * Send the viewer the list of current sessions.
  */
 static
-int viewer_get_new_streams(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_new_streams(struct relay_connection *conn)
 {
        int ret, send_streams = 0;
        uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0;
@@ -826,15 +821,14 @@ int viewer_get_new_streams(struct relay_command *cmd,
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        DBG("Get new streams received");
 
        health_code_update();
 
        /* Receive the request from the connected client. */
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto error;
        }
@@ -842,21 +836,22 @@ int viewer_get_new_streams(struct relay_command *cmd,
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, be64toh(request.session_id));
+       session = session_find_by_id(conn->sessions_ht,
+                       be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
                                be64toh(request.session_id));
-               response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
-       if (cmd->session_id == session->id) {
+       if (conn->session_id == session->id) {
                /* We confirmed the viewer is asking for the same session. */
                send_streams = 1;
-               response.status = htobe32(VIEWER_NEW_STREAMS_OK);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
        } else {
                send_streams = 0;
-               response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
@@ -864,7 +859,7 @@ int viewer_get_new_streams(struct relay_command *cmd,
                goto send_reply;
        }
 
-       ret = make_viewer_streams(session, VIEWER_SEEK_LAST, NULL, &nb_unsent,
+       ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
                        &nb_created);
        if (ret < 0) {
                goto end_unlock;
@@ -875,7 +870,7 @@ int viewer_get_new_streams(struct relay_command *cmd,
 
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &response, sizeof(response));
+       ret = send_response(conn->sock, &response, sizeof(response));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -894,7 +889,7 @@ send_reply:
         * Send stream and *DON'T* ignore the sent flag so every viewer streams
         * that were not sent from that point will be sent to the viewer.
         */
-       ret = send_viewer_streams(cmd->sock, session, 0);
+       ret = send_viewer_streams(conn->sock, session, 0);
        if (ret < 0) {
                goto end_unlock;
        }
@@ -909,8 +904,7 @@ error:
  * Send the viewer the list of current sessions.
  */
 static
-int viewer_attach_session(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_attach_session(struct relay_connection *conn)
 {
        int send_streams = 0;
        ssize_t ret;
@@ -920,25 +914,31 @@ int viewer_attach_session(struct relay_command *cmd,
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        health_code_update();
 
        /* Receive the request from the connected client. */
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto error;
        }
 
        health_code_update();
 
+       if (!conn->viewer_session) {
+               DBG("Client trying to attach before creating a live viewer session");
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+               goto send_reply;
+       }
+
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, be64toh(request.session_id));
+       session = session_find_by_id(conn->sessions_ht,
+                       be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
                                be64toh(request.session_id));
-               response.status = htobe32(VIEWER_ATTACH_UNK);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
        session_viewer_attach(session);
@@ -946,28 +946,28 @@ int viewer_attach_session(struct relay_command *cmd,
 
        if (uatomic_read(&session->viewer_refcount) > 1) {
                DBG("Already a viewer attached");
-               response.status = htobe32(VIEWER_ATTACH_ALREADY);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
                session_viewer_detach(session);
                goto send_reply;
        } else if (session->live_timer == 0) {
                DBG("Not live session");
-               response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
                goto send_reply;
        } else {
                send_streams = 1;
-               response.status = htobe32(VIEWER_ATTACH_OK);
-               cmd->session_id = session->id;
-               cmd->session = session;
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
+               conn->session_id = session->id;
+               conn->session = session;
        }
 
        switch (be32toh(request.seek)) {
-       case VIEWER_SEEK_BEGINNING:
-       case VIEWER_SEEK_LAST:
+       case LTTNG_VIEWER_SEEK_BEGINNING:
+       case LTTNG_VIEWER_SEEK_LAST:
                seek_type = be32toh(request.seek);
                break;
        default:
                ERR("Wrong seek parameter");
-               response.status = htobe32(VIEWER_ATTACH_SEEK_ERR);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
                send_streams = 0;
                goto send_reply;
        }
@@ -984,7 +984,7 @@ int viewer_attach_session(struct relay_command *cmd,
 
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &response, sizeof(response));
+       ret = send_response(conn->sock, &response, sizeof(response));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1000,7 +1000,7 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(cmd->sock, session, 1);
+       ret = send_viewer_streams(conn->sock, session, 1);
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1017,8 +1017,7 @@ error:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_next_index(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_next_index(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_get_next_index request_index;
@@ -1029,21 +1028,20 @@ int viewer_get_next_index(struct relay_command *cmd,
        struct ctf_trace *ctf_trace;
        struct relay_session *session;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        DBG("Viewer get next index");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &request_index, sizeof(request_index));
+       ret = recv_request(conn->sock, &request_index, sizeof(request_index));
        if (ret < 0) {
                goto end;
        }
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, cmd->session_id);
+       session = session_find_by_id(conn->sessions_ht, conn->session_id);
        if (!session) {
                ret = -1;
                goto end_unlock;
@@ -1064,7 +1062,7 @@ int viewer_get_next_index(struct relay_command *cmd,
         * The viewer should not ask for index on metadata stream.
         */
        if (vstream->metadata_flag) {
-               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto send_reply;
        }
 
@@ -1077,10 +1075,10 @@ int viewer_get_next_index(struct relay_command *cmd,
                         * The index is created only when the first data packet arrives, it
                         * might not be ready at the beginning of the session
                         */
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                        goto send_reply;
                } else if (ret < 0) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                        goto send_reply;
                }
                vstream->index_read_fd = ret;
@@ -1097,7 +1095,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                        if (ret < 0) {
                                goto end_unlock;
                        } else if (ret == 1) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
                                viewer_stream_destroy(ctf_trace, vstream);
                                goto send_reply;
@@ -1109,7 +1107,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
                        if (rstream->beacon_ts_end != -1ULL &&
                                vstream->last_sent_index == rstream->total_index_received) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                                viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                goto send_reply;
@@ -1122,7 +1120,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                                 */
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                /* No new index to send, retry later. */
-                               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                                goto send_reply;
                        }
                }
@@ -1130,7 +1128,7 @@ int viewer_get_next_index(struct relay_command *cmd,
        } else if (rstream->close_flag && vstream->close_write_flag &&
                        vstream->total_index_received == vstream->last_sent_index) {
                /* Last index sent and current tracefile closed in write */
-               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                viewer_stream_delete(vstream);
                viewer_stream_destroy(ctf_trace, vstream);
                goto send_reply;
@@ -1143,7 +1141,7 @@ int viewer_get_next_index(struct relay_command *cmd,
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
-       ret = check_new_streams(vstream->session_id, sessions_ht);
+       ret = check_new_streams(vstream->session_id, conn->sessions_ht);
        if (ret < 0) {
                goto end_unlock;
        } else if (ret == 1) {
@@ -1155,13 +1153,13 @@ int viewer_get_next_index(struct relay_command *cmd,
                /*
                 * The file is being overwritten by the writer, we cannot * use it.
                 */
-               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                pthread_mutex_unlock(&vstream->overwrite_lock);
                ret = viewer_stream_rotate(vstream, rstream);
                if (ret < 0) {
                        goto end_unlock;
                } else if (ret == 1) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        viewer_stream_delete(vstream);
                        viewer_stream_destroy(ctf_trace, vstream);
                        goto send_reply;
@@ -1177,24 +1175,24 @@ int viewer_get_next_index(struct relay_command *cmd,
                 * The tracefile is closed in write, so we read up to EOF.
                 */
                if (vstream->close_write_flag == 1) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                        /* Rotate on normal EOF */
                        ret = viewer_stream_rotate(vstream, rstream);
                        if (ret < 0) {
                                goto end_unlock;
                        } else if (ret == 1) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
                                viewer_stream_destroy(ctf_trace, vstream);
                                goto send_reply;
                        }
                } else {
                        PERROR("Relay reading index file %d", vstream->index_read_fd);
-                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                }
                goto send_reply;
        } else {
-               viewer_index.status = htobe32(VIEWER_INDEX_OK);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
                vstream->last_sent_index++;
        }
 
@@ -1213,7 +1211,7 @@ send_reply:
        viewer_index.flags = htobe32(viewer_index.flags);
        health_code_update();
 
-       ret = send_response(cmd->sock, &viewer_index, sizeof(viewer_index));
+       ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1235,8 +1233,7 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_packet(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_packet(struct relay_connection *conn)
 {
        int ret, send_data = 0;
        char *data = NULL;
@@ -1247,13 +1244,13 @@ int viewer_get_packet(struct relay_command *cmd,
        struct relay_viewer_stream *stream;
        struct ctf_trace *ctf_trace;
 
-       assert(cmd);
+       assert(conn);
 
        DBG2("Relay get data packet");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &get_packet_info, sizeof(get_packet_info));
+       ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info));
        if (ret < 0) {
                goto end;
        }
@@ -1268,7 +1265,7 @@ int viewer_get_packet(struct relay_command *cmd,
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+       ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
 
@@ -1301,16 +1298,16 @@ int viewer_get_packet(struct relay_command *cmd,
 
        if (!ctf_trace->metadata_received ||
                        ctf_trace->metadata_received > ctf_trace->metadata_sent) {
-               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
                goto send_reply;
        }
 
-       ret = check_new_streams(stream->session_id, sessions_ht);
+       ret = check_new_streams(stream->session_id, conn->sessions_ht);
        if (ret < 0) {
                goto end_unlock;
        } else if (ret == 1) {
-               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
                goto send_reply;
        }
@@ -1332,7 +1329,7 @@ int viewer_get_packet(struct relay_command *cmd,
                        PERROR("lseek");
                        goto error;
                }
-               reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
                goto send_reply;
        }
        read_len = lttng_read(stream->read_fd, data, len);
@@ -1347,24 +1344,24 @@ int viewer_get_packet(struct relay_command *cmd,
                                        be64toh(get_packet_info.offset));
                        goto error;
                } else {
-                       reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+                       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
                        goto send_reply;
                }
        }
-       reply.status = htobe32(VIEWER_GET_PACKET_OK);
+       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply.len = htobe32(len);
        send_data = 1;
        goto send_reply;
 
 error:
-       reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        reply.flags = htobe32(reply.flags);
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1372,7 +1369,7 @@ send_reply:
 
        if (send_data) {
                health_code_update();
-               ret = send_response(cmd->sock, data, len);
+               ret = send_response(conn->sock, data, len);
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -1396,7 +1393,7 @@ end:
  * Return 0 on success else a negative value.
  */
 static
-int viewer_get_metadata(struct relay_command *cmd)
+int viewer_get_metadata(struct relay_connection *conn)
 {
        int ret = 0;
        ssize_t read_len;
@@ -1407,13 +1404,13 @@ int viewer_get_metadata(struct relay_command *cmd)
        struct relay_viewer_stream *stream;
        struct ctf_trace *ctf_trace;
 
-       assert(cmd);
+       assert(conn);
 
        DBG("Relay get metadata");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto end;
        }
@@ -1426,14 +1423,14 @@ int viewer_get_metadata(struct relay_command *cmd)
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+       ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
        assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
 
        len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
        if (len == 0) {
-               reply.status = htobe32(VIEWER_NO_NEW_METADATA);
+               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                goto send_reply;
        }
 
@@ -1467,22 +1464,22 @@ int viewer_get_metadata(struct relay_command *cmd)
                goto error;
        }
        ctf_trace->metadata_sent += read_len;
-       reply.status = htobe32(VIEWER_METADATA_OK);
+       reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
        goto send_reply;
 
 error:
-       reply.status = htobe32(VIEWER_METADATA_ERR);
+       reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end_unlock;
        }
        health_code_update();
 
        if (len > 0) {
-               ret = send_response(cmd->sock, data, len);
+               ret = send_response(conn->sock, data, len);
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -1500,16 +1497,52 @@ end:
        return ret;
 }
 
+/*
+ * Create a viewer session.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static
+int viewer_create_session(struct relay_connection *conn)
+{
+       int ret;
+       struct lttng_viewer_create_session_response resp;
+
+       DBG("Viewer create session received");
+
+       resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
+       conn->viewer_session = zmalloc(sizeof(conn->viewer_session));
+       if (!conn->viewer_session) {
+               ERR("Allocation viewer session");
+               resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
+               goto send_reply;
+       }
+       CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
+
+send_reply:
+       health_code_update();
+       ret = send_response(conn->sock, &resp, sizeof(resp));
+       if (ret < 0) {
+               goto end;
+       }
+       health_code_update();
+       ret = 0;
+
+end:
+       return ret;
+}
+
+
 /*
  * live_relay_unknown_command: send -1 if received unknown command
  */
 static
-void live_relay_unknown_command(struct relay_command *cmd)
+void live_relay_unknown_command(struct relay_connection *conn)
 {
        struct lttcomm_relayd_generic_reply reply;
 
        reply.ret_code = htobe32(LTTNG_ERR_UNK);
-       (void) send_response(cmd->sock, &reply, sizeof(reply));
+       (void) send_response(conn->sock, &reply, sizeof(reply));
 }
 
 /*
@@ -1517,14 +1550,13 @@ void live_relay_unknown_command(struct relay_command *cmd)
  */
 static
 int process_control(struct lttng_viewer_cmd *recv_hdr,
-               struct relay_command *cmd, struct lttng_ht *sessions_ht)
+               struct relay_connection *conn)
 {
        int ret = 0;
        uint32_t msg_value;
 
        assert(recv_hdr);
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        msg_value = be32toh(recv_hdr->cmd);
 
@@ -1532,37 +1564,40 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
         * Make sure we've done the version check before any command other then a
         * new client connection.
         */
-       if (msg_value != VIEWER_CONNECT && !cmd->version_check_done) {
-               ERR("Viewer cmd value %" PRIu32 " before version check", msg_value);
+       if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
+               ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
                ret = -1;
                goto end;
        }
 
        switch (msg_value) {
-       case VIEWER_CONNECT:
-               ret = viewer_connect(cmd);
+       case LTTNG_VIEWER_CONNECT:
+               ret = viewer_connect(conn);
+               break;
+       case LTTNG_VIEWER_LIST_SESSIONS:
+               ret = viewer_list_sessions(conn);
                break;
-       case VIEWER_LIST_SESSIONS:
-               ret = viewer_list_sessions(cmd, sessions_ht);
+       case LTTNG_VIEWER_ATTACH_SESSION:
+               ret = viewer_attach_session(conn);
                break;
-       case VIEWER_ATTACH_SESSION:
-               ret = viewer_attach_session(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_NEXT_INDEX:
+               ret = viewer_get_next_index(conn);
                break;
-       case VIEWER_GET_NEXT_INDEX:
-               ret = viewer_get_next_index(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_PACKET:
+               ret = viewer_get_packet(conn);
                break;
-       case VIEWER_GET_PACKET:
-               ret = viewer_get_packet(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_METADATA:
+               ret = viewer_get_metadata(conn);
                break;
-       case VIEWER_GET_METADATA:
-               ret = viewer_get_metadata(cmd);
+       case LTTNG_VIEWER_GET_NEW_STREAMS:
+               ret = viewer_get_new_streams(conn);
                break;
-       case VIEWER_GET_NEW_STREAMS:
-               ret = viewer_get_new_streams(cmd, sessions_ht);
+       case LTTNG_VIEWER_CREATE_SESSION:
+               ret = viewer_create_session(conn);
                break;
        default:
                ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
-               live_relay_unknown_command(cmd);
+               live_relay_unknown_command(conn);
                ret = -1;
                goto end;
        }
@@ -1572,13 +1607,13 @@ end:
 }
 
 static
-void cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
+void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 {
        int ret;
 
        assert(events);
 
-       lttng_poll_del(events, pollfd);
+       (void) lttng_poll_del(events, pollfd);
 
        ret = close(pollfd);
        if (ret < 0) {
@@ -1586,59 +1621,6 @@ void cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
        }
 }
 
-/*
- * Create and add connection to the given hash table.
- *
- * Return poll add value or else -1 on error.
- */
-static
-int add_connection(int fd, struct lttng_poll_event *events,
-               struct lttng_ht *relay_connections_ht)
-{
-       int ret;
-       struct relay_command *relay_connection;
-
-       assert(events);
-       assert(relay_connections_ht);
-
-       relay_connection = zmalloc(sizeof(struct relay_command));
-       if (relay_connection == NULL) {
-               PERROR("Relay command zmalloc");
-               goto error;
-       }
-
-       ret = lttng_read(fd, relay_connection, sizeof(*relay_connection));
-       if (ret < sizeof(*relay_connection)) {
-               PERROR("read relay cmd pipe");
-               goto error_read;
-       }
-
-       lttng_ht_node_init_ulong(&relay_connection->sock_n,
-                       (unsigned long) relay_connection->sock->fd);
-       rcu_read_lock();
-       lttng_ht_add_unique_ulong(relay_connections_ht,
-                       &relay_connection->sock_n);
-       rcu_read_unlock();
-
-       return lttng_poll_add(events, relay_connection->sock->fd,
-                       LPOLLIN | LPOLLRDHUP);
-
-error_read:
-       free(relay_connection);
-error:
-       return -1;
-}
-
-static
-void deferred_free_connection(struct rcu_head *head)
-{
-       struct relay_command *relay_connection =
-               caa_container_of(head, struct relay_command, rcu_node);
-
-       lttcomm_destroy_sock(relay_connection->sock);
-       free(relay_connection);
-}
-
 /*
  * Delete all streams for a specific session ID.
  */
@@ -1690,31 +1672,24 @@ static void try_destroy_streams(struct relay_session *session)
 }
 
 /*
- * Delete and free a connection.
+ * Delete and destroy a connection.
  *
  * RCU read side lock MUST be acquired.
  */
-static
-void del_connection(struct lttng_ht *relay_connections_ht,
-               struct lttng_ht_iter *iter, struct relay_command *relay_connection,
-               struct lttng_ht *sessions_ht)
+static void destroy_connection(struct lttng_ht *relay_connections_ht,
+               struct relay_connection *conn)
 {
-       int ret;
        struct relay_session *session;
 
        assert(relay_connections_ht);
-       assert(iter);
-       assert(relay_connection);
-       assert(sessions_ht);
+       assert(conn);
 
-       DBG("Cleaning connection of session ID %" PRIu64,
-                       relay_connection->session_id);
+       DBG("Cleaning connection of session ID %" PRIu64, conn->session_id);
 
-       rcu_read_lock();
-       ret = lttng_ht_del(relay_connections_ht, iter);
-       assert(!ret);
+       connection_delete(relay_connections_ht, conn);
 
-       session = session_find_by_id(sessions_ht, relay_connection->session_id);
+       rcu_read_lock();
+       session = session_find_by_id(conn->sessions_ht, conn->session_id);
        if (session) {
                /*
                 * Very important that this is done before destroying the session so we
@@ -1722,11 +1697,11 @@ void del_connection(struct lttng_ht *relay_connections_ht,
                 */
                destroy_viewer_streams_by_session(session);
                try_destroy_streams(session);
-               session_viewer_try_destroy(sessions_ht, session);
+               session_viewer_try_destroy(conn->sessions_ht, session);
        }
        rcu_read_unlock();
 
-       call_rcu(&relay_connection->rcu_node, deferred_free_connection);
+       connection_destroy(conn);
 }
 
 /*
@@ -1737,10 +1712,9 @@ void *thread_worker(void *data)
 {
        int ret, err = -1;
        uint32_t nb_fd;
-       struct relay_command *relay_connection;
+       struct relay_connection *conn;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
-       struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
        struct lttng_viewer_cmd recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
@@ -1767,7 +1741,7 @@ void *thread_worker(void *data)
                goto error_poll_create;
        }
 
-       ret = lttng_poll_add(&events, live_relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
+       ret = lttng_poll_add(&events, live_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
@@ -1814,66 +1788,49 @@ restart:
                                goto exit;
                        }
 
-                       /* Inspect the relay cmd pipe for new connection */
-                       if (pollfd == live_relay_cmd_pipe[0]) {
+                       /* Inspect the relay conn pipe for new connection */
+                       if (pollfd == live_conn_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Relay live pipe error");
                                        goto error;
                                } else if (revents & LPOLLIN) {
-                                       DBG("Relay live viewer command received");
-                                       ret = add_connection(live_relay_cmd_pipe[0],
-                                                       &events, relay_connections_ht);
+                                       ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
                                        if (ret < 0) {
                                                goto error;
                                        }
-                               }
-                       } else if (revents) {
-                               rcu_read_lock();
-                               lttng_ht_lookup(relay_connections_ht,
-                                               (void *)((unsigned long) pollfd), &iter);
-                               node = lttng_ht_iter_get_node_ulong(&iter);
-                               if (node == NULL) {
-                                       DBG2("Relay viewer sock %d not found", pollfd);
+                                       conn->sessions_ht = sessions_ht;
+                                       connection_init(conn);
+                                       lttng_poll_add(&events, conn->sock->fd,
+                                                       LPOLLIN | LPOLLRDHUP);
+                                       rcu_read_lock();
+                                       lttng_ht_add_unique_ulong(relay_connections_ht,
+                                                       &conn->sock_n);
                                        rcu_read_unlock();
-                                       goto error;
+                                       DBG("Connection socket %d added", conn->sock->fd);
                                }
-                               relay_connection = caa_container_of(node, struct relay_command,
-                                               sock_n);
-
-                               if (revents & (LPOLLERR)) {
-                                       cleanup_poll_connection(&events, pollfd);
-                                       del_connection(relay_connections_ht, &iter,
-                                                       relay_connection, relay_ctx->sessions_ht);
-                               } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
-                                       DBG("Viewer socket %d hung up", pollfd);
-                                       cleanup_poll_connection(&events, pollfd);
-                                       del_connection(relay_connections_ht, &iter,
-                                                       relay_connection, relay_ctx->sessions_ht);
+                       } else {
+                               rcu_read_lock();
+                               conn = connection_find_by_sock(relay_connections_ht, pollfd);
+                               /* If not found, there is a synchronization issue. */
+                               assert(conn);
+
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       destroy_connection(relay_connections_ht, conn);
                                } else if (revents & LPOLLIN) {
-                                       ret = relay_connection->sock->ops->recvmsg(
-                                                       relay_connection->sock, &recv_hdr,
-                                                       sizeof(struct lttng_viewer_cmd),
-                                                       0);
-                                       /* connection closed */
+                                       ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
+                                                       sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
-                                               cleanup_poll_connection(&events, pollfd);
-                                               del_connection(relay_connections_ht, &iter,
-                                                               relay_connection, relay_ctx->sessions_ht);
-                                               DBG("Viewer control connection closed with %d",
-                                                               pollfd);
+                                               /* Connection closed */
+                                               cleanup_connection_pollfd(&events, pollfd);
+                                               destroy_connection(relay_connections_ht, conn);
+                                               DBG("Viewer control conn closed with %d", pollfd);
                                        } else {
-                                               if (relay_connection->session) {
-                                                       DBG2("Relay viewer worker receiving data for "
-                                                                       "session: %" PRIu64,
-                                                                       relay_connection->session->id);
-                                               }
-                                               ret = process_control(&recv_hdr, relay_connection,
-                                                               sessions_ht);
+                                               ret = process_control(&recv_hdr, conn);
                                                if (ret < 0) {
                                                        /* Clear the session on error. */
-                                                       cleanup_poll_connection(&events, pollfd);
-                                                       del_connection(relay_connections_ht, &iter,
-                                                                       relay_connection, relay_ctx->sessions_ht);
+                                                       cleanup_connection_pollfd(&events, pollfd);
+                                                       destroy_connection(relay_connections_ht, conn);
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
@@ -1887,27 +1844,19 @@ exit:
 error:
        lttng_poll_clean(&events);
 
-       /* empty the hash table and free the memory */
+       /* Cleanup reamaining connection object. */
        rcu_read_lock();
-       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+                       sock_n.node) {
                health_code_update();
-
-               node = lttng_ht_iter_get_node_ulong(&iter);
-               if (!node) {
-                       continue;
-               }
-
-               relay_connection = caa_container_of(node, struct relay_command,
-                               sock_n);
-               del_connection(relay_connections_ht, &iter, relay_connection,
-                               relay_ctx->sessions_ht);
+               destroy_connection(relay_connections_ht, conn);
        }
        rcu_read_unlock();
 error_poll_create:
        lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
-       /* Close relay cmd pipes */
-       utils_close_pipe(live_relay_cmd_pipe);
+       /* Close relay conn pipes */
+       utils_close_pipe(live_conn_pipe);
        if (err) {
                DBG("Viewer worker thread exited with error");
        }
@@ -1927,11 +1876,11 @@ error_testpoint:
  * Create the relay command pipe to wake thread_manage_apps.
  * Closed in cleanup().
  */
-static int create_relay_cmd_pipe(void)
+static int create_conn_pipe(void)
 {
        int ret;
 
-       ret = utils_create_pipe_cloexec(live_relay_cmd_pipe);
+       ret = utils_create_pipe_cloexec(live_conn_pipe);
 
        return ret;
 }
@@ -1992,12 +1941,12 @@ int live_start_threads(struct lttng_uri *uri,
        }
 
        /* Setup the thread apps communication pipe. */
-       if ((ret = create_relay_cmd_pipe()) < 0) {
+       if ((ret = create_conn_pipe()) < 0) {
                goto exit;
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&viewer_cmd_queue.queue);
+       cds_wfq_init(&viewer_conn_queue.queue);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
This page took 0.061539 seconds and 5 git commands to generate.