relayd: keep track of prev_index_seq in relayd_stream
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 09a73e392ada24798e5e1c143000f0572a253c2f..92f9a5600ac28b7280dcea9f1041b7725b13b748 100644 (file)
@@ -1106,16 +1106,19 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
 
        memset(&reply, 0, sizeof(reply));
 
-       switch (conn->minor) {
-       case 1:
-       case 2:
-       case 3:
-               break;
-       case 4: /* LTTng sessiond 2.4 */
-       default:
+       if (conn->minor < 4) {
+               /* From 2.1 to 2.3 */
+               ret = 0;
+       } else if (conn->minor >= 4 && conn->minor < 11) {
+               /* From 2.4 to 2.10 */
                ret = cmd_create_session_2_4(payload, session_name,
                        hostname, &live_timer, &snapshot);
+       } else {
+               /* From 2.11 to ... */
+               ret = cmd_create_session_2_11(payload, session_name,
+                       hostname, &live_timer, &snapshot);
        }
+
        if (ret < 0) {
                goto send_reply;
        }
@@ -1195,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
+       struct relay_stream_chunk_id stream_chunk_id = { 0 };
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1202,17 +1206,22 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end_no_session;
        }
 
-       switch (session->minor) {
-       case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+       if (session->minor == 1) {
+               /* For 2.1 */
                ret = cmd_recv_stream_2_1(payload, &path_name,
                        &channel_name);
-               break;
-       case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
-       default:
+       } else if (session->minor > 1 && session->minor < 11) {
+               /* From 2.2 to 2.10 */
                ret = cmd_recv_stream_2_2(payload, &path_name,
                        &channel_name, &tracefile_size, &tracefile_count);
-               break;
+       } else {
+               /* From 2.11 to ... */
+               ret = cmd_recv_stream_2_11(payload, &path_name,
+                       &channel_name, &tracefile_size, &tracefile_count,
+                       &stream_chunk_id.value);
+               stream_chunk_id.is_set = true;
        }
+
        if (ret < 0) {
                goto send_reply;
        }
@@ -1229,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-                       channel_name, tracefile_size, tracefile_count);
+               channel_name, tracefile_size, tracefile_count,
+               &stream_chunk_id);
        path_name = NULL;
        channel_name = NULL;
 
@@ -2304,12 +2314,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
+               stream->prev_index_seq = index_info.net_seq_num;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
        } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
                ERR("relay_index_try_flush error %d", ret);
-               relay_index_put(index);
                ret = -1;
        }
 
@@ -2475,7 +2491,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       stream->chunk_id = stream_info.new_chunk_id;
+       assert(stream->current_chunk_id.is_set);
+       stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
                /*
@@ -2809,7 +2826,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        chunk_id = be64toh(msg.chunk_id);
 
-       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+       DBG("Evaluating rotate pending for session \"%s\" and  chunk id %" PRIu64,
+                       session->session_name, chunk_id);
 
        /*
         * Iterate over all the streams in the session and check if they are
@@ -2831,7 +2849,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        rotate_pending = true;
                        DBG("Stream %" PRIu64 " is still rotating",
                                        stream->stream_handle);
-               } else if (stream->chunk_id < chunk_id) {
+               } else if (stream->current_chunk_id.value < chunk_id) {
                        /*
                         * Stream closed on the consumer but still active on the
                         * relay.
@@ -3205,9 +3223,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                /* No flush. */
                ret = 0;
        } else {
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
 end:
@@ -3256,7 +3278,6 @@ static enum relay_connection_status relay_process_data_receive_header(
                DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3323,7 +3344,6 @@ static enum relay_connection_status relay_process_data_receive_header(
                conn->protocol.data.state.receive_payload.rotate_index = true;
        }
 
-       ret = 0;
 end_stream_unlock:
        pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
@@ -3346,10 +3366,14 @@ static enum relay_connection_status relay_process_data_receive_payload(
        uint64_t left_to_receive = state->left_to_receive;
        struct relay_session *session;
 
+       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+                       state->header.stream_id, state->header.net_seq_num,
+                       state->received, left_to_receive);
+
        stream = stream_get_by_id(state->header.stream_id);
        if (!stream) {
                /* Protocol error. */
-               DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+               ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
                                state->header.stream_id);
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
@@ -3357,10 +3381,13 @@ static enum relay_connection_status relay_process_data_receive_payload(
 
        pthread_mutex_lock(&stream->lock);
        session = stream->trace->session;
-
-       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
-                       state->header.stream_id, state->header.net_seq_num,
-                       state->received, left_to_receive);
+       if (!conn->session) {
+               ret = connection_set_session(conn, session);
+               if (ret) {
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+                       goto end_stream_unlock;
+               }
+       }
 
        /*
         * The size of the "chunk" received on any iteration is bounded by:
@@ -3426,7 +3453,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
 
        ret = write_padding_to_file(stream->stream_fd->fd,
                        state->header.padding_size);
-       if (ret < 0) {
+       if ((int64_t) ret < (int64_t) state->header.padding_size) {
                ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                stream->stream_handle,
                                state->header.net_seq_num, ret);
@@ -3456,6 +3483,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
+               stream->prev_index_seq = state->header.net_seq_num;
        }
 
        stream->prev_seq = state->header.net_seq_num;
@@ -3688,6 +3716,26 @@ restart:
 
                                        status = relay_process_control(ctrl_conn);
                                        if (status != RELAY_CONNECTION_STATUS_OK) {
+                                               /*
+                                                * On socket error flag the session as aborted to force
+                                                * the cleanup of its stream otherwise it can leak
+                                                * during the lifetime of the relayd.
+                                                *
+                                                * This prevents situations in which streams can be
+                                                * left opened because an index was received, the
+                                                * control connection is closed, and the data
+                                                * connection is closed (uncleanly) before the packet's
+                                                * data provided.
+                                                *
+                                                * Since the control connection encountered an error,
+                                                * it is okay to be conservative and close the
+                                                * session right now as we can't rely on the protocol
+                                                * being respected anymore.
+                                                */
+                                               if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                                       session_abort(ctrl_conn->session);
+                                               }
+
                                                /* Clear the connection on error or close. */
                                                relay_thread_close_connection(&events,
                                                                pollfd,
@@ -3767,6 +3815,25 @@ restart:
                                status = relay_process_data(data_conn);
                                /* Connection closed or error. */
                                if (status != RELAY_CONNECTION_STATUS_OK) {
+                                       /*
+                                        * On socket error flag the session as aborted to force
+                                        * the cleanup of its stream otherwise it can leak
+                                        * during the lifetime of the relayd.
+                                        *
+                                        * This prevents situations in which streams can be
+                                        * left opened because an index was received, the
+                                        * control connection is closed, and the data
+                                        * connection is closed (uncleanly) before the packet's
+                                        * data provided.
+                                        *
+                                        * Since the data connection encountered an error,
+                                        * it is okay to be conservative and close the
+                                        * session right now as we can't rely on the protocol
+                                        * being respected anymore.
+                                        */
+                                       if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                               session_abort(data_conn->session);
+                                       }
                                        relay_thread_close_connection(&events, pollfd,
                                                        data_conn);
                                        /*
@@ -3798,16 +3865,14 @@ restart:
 
 exit:
 error:
-       /* Cleanup reamaining connection object. */
+       /* Cleanup remaining connection object. */
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
                        destroy_conn,
                        sock_n.node) {
                health_code_update();
 
-               if (session_abort(destroy_conn->session)) {
-                       assert(0);
-               }
+               session_abort(destroy_conn->session);
 
                /*
                 * No need to grab another ref, because we own
This page took 0.027711 seconds and 5 git commands to generate.