X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=92f9a5600ac28b7280dcea9f1041b7725b13b748;hp=09a73e392ada24798e5e1c143000f0572a253c2f;hb=7a45c7e6401baebe3715b317a3d871ee49921057;hpb=5569b118e687de5c6179b3a432187f06c1277608 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 09a73e392..92f9a5600 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1106,16 +1106,19 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); - switch (conn->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: + if (conn->minor < 4) { + /* From 2.1 to 2.3 */ + ret = 0; + } else if (conn->minor >= 4 && conn->minor < 11) { + /* From 2.4 to 2.10 */ ret = cmd_create_session_2_4(payload, session_name, hostname, &live_timer, &snapshot); + } else { + /* From 2.11 to ... */ + ret = cmd_create_session_2_11(payload, session_name, + hostname, &live_timer, &snapshot); } + if (ret < 0) { goto send_reply; } @@ -1195,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + struct relay_stream_chunk_id stream_chunk_id = { 0 }; if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); @@ -1202,17 +1206,22 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - switch (session->minor) { - case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */ + if (session->minor == 1) { + /* For 2.1 */ ret = cmd_recv_stream_2_1(payload, &path_name, &channel_name); - break; - case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */ - default: + } else if (session->minor > 1 && session->minor < 11) { + /* From 2.2 to 2.10 */ ret = cmd_recv_stream_2_2(payload, &path_name, &channel_name, &tracefile_size, &tracefile_count); - break; + } else { + /* From 2.11 to ... */ + ret = cmd_recv_stream_2_11(payload, &path_name, + &channel_name, &tracefile_size, &tracefile_count, + &stream_chunk_id.value); + stream_chunk_id.is_set = true; } + if (ret < 0) { goto send_reply; } @@ -1229,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count, + &stream_chunk_id); path_name = NULL; channel_name = NULL; @@ -2304,12 +2314,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; + stream->prev_index_seq = index_info.net_seq_num; } else if (ret > 0) { /* no flush. */ ret = 0; } else { + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ ERR("relay_index_try_flush error %d", ret); - relay_index_put(index); ret = -1; } @@ -2475,7 +2491,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr goto end_stream_unlock; } - stream->chunk_id = stream_info.new_chunk_id; + assert(stream->current_chunk_id.is_set); + stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { /* @@ -2809,7 +2826,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, chunk_id = be64toh(msg.chunk_id); - DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id); + DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64, + session->session_name, chunk_id); /* * Iterate over all the streams in the session and check if they are @@ -2831,7 +2849,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { + } else if (stream->current_chunk_id.value < chunk_id) { /* * Stream closed on the consumer but still active on the * relay. @@ -3205,9 +3223,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* No flush. */ ret = 0; } else { - /* Put self-ref for this index due to error. */ - relay_index_put(index); - index = NULL; + /* + * ret < 0 + * + * relay_index_try_flush is responsible for the self-reference + * put of the index object on error. + */ + ERR("relay_index_try_flush error %d", ret); ret = -1; } end: @@ -3256,7 +3278,6 @@ static enum relay_connection_status relay_process_data_receive_header( DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)", state->received, state->left_to_receive, conn->sock->fd); - ret = 0; goto end; } @@ -3323,7 +3344,6 @@ static enum relay_connection_status relay_process_data_receive_header( conn->protocol.data.state.receive_payload.rotate_index = true; } - ret = 0; end_stream_unlock: pthread_mutex_unlock(&stream->lock); stream_put(stream); @@ -3346,10 +3366,14 @@ static enum relay_connection_status relay_process_data_receive_payload( uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->header.net_seq_num, + state->received, left_to_receive); + stream = stream_get_by_id(state->header.stream_id); if (!stream) { /* Protocol error. */ - DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, + ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64, state->header.stream_id); status = RELAY_CONNECTION_STATUS_ERROR; goto end; @@ -3357,10 +3381,13 @@ static enum relay_connection_status relay_process_data_receive_payload( pthread_mutex_lock(&stream->lock); session = stream->trace->session; - - DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", - state->header.stream_id, state->header.net_seq_num, - state->received, left_to_receive); + if (!conn->session) { + ret = connection_set_session(conn, session); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + } /* * The size of the "chunk" received on any iteration is bounded by: @@ -3426,7 +3453,7 @@ static enum relay_connection_status relay_process_data_receive_payload( ret = write_padding_to_file(stream->stream_fd->fd, state->header.padding_size); - if (ret < 0) { + if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); @@ -3456,6 +3483,7 @@ static enum relay_connection_status relay_process_data_receive_payload( if (index_flushed) { stream->pos_after_last_complete_data_index = stream->tracefile_size_current; + stream->prev_index_seq = state->header.net_seq_num; } stream->prev_seq = state->header.net_seq_num; @@ -3688,6 +3716,26 @@ restart: status = relay_process_control(ctrl_conn); if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the control connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(ctrl_conn->session); + } + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, @@ -3767,6 +3815,25 @@ restart: status = relay_process_data(data_conn); /* Connection closed or error. */ if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the data connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(data_conn->session); + } relay_thread_close_connection(&events, pollfd, data_conn); /* @@ -3798,16 +3865,14 @@ restart: exit: error: - /* Cleanup reamaining connection object. */ + /* Cleanup remaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); - if (session_abort(destroy_conn->session)) { - assert(0); - } + session_abort(destroy_conn->session); /* * No need to grab another ref, because we own