#endif
;
+enum relay_connection_status {
+ RELAY_CONNECTION_STATUS_OK,
+ /* An error occured while processing an event on the connection. */
+ RELAY_CONNECTION_STATUS_ERROR,
+ /* Connection closed/shutdown cleanly. */
+ RELAY_CONNECTION_STATUS_CLOSED,
+};
+
/* command line options */
char *opt_output_path;
static int opt_daemon, opt_background;
memset(&reply, 0, sizeof(reply));
- switch (conn->minor) {
- case 1:
- case 2:
- case 3:
- break;
- case 4: /* LTTng sessiond 2.4 */
- default:
+ if (conn->minor < 4) {
+ /* From 2.1 to 2.3 */
+ ret = 0;
+ } else if (conn->minor >= 4 && conn->minor < 11) {
+ /* From 2.4 to 2.10 */
ret = cmd_create_session_2_4(payload, session_name,
hostname, &live_timer, &snapshot);
+ } else {
+ /* From 2.11 to ... */
+ ret = cmd_create_session_2_11(payload, session_name,
+ hostname, &live_timer, &snapshot);
}
+
if (ret < 0) {
goto send_reply;
}
uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
+ struct relay_stream_chunk_id stream_chunk_id = { 0 };
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
goto end_no_session;
}
- switch (session->minor) {
- case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+ if (session->minor == 1) {
+ /* For 2.1 */
ret = cmd_recv_stream_2_1(payload, &path_name,
&channel_name);
- break;
- case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
- default:
+ } else if (session->minor > 1 && session->minor < 11) {
+ /* From 2.2 to 2.10 */
ret = cmd_recv_stream_2_2(payload, &path_name,
&channel_name, &tracefile_size, &tracefile_count);
- break;
+ } else {
+ /* From 2.11 to ... */
+ ret = cmd_recv_stream_2_11(payload, &path_name,
+ &channel_name, &tracefile_size, &tracefile_count,
+ &stream_chunk_id.value);
+ stream_chunk_id.is_set = true;
}
+
if (ret < 0) {
goto send_reply;
}
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count);
+ channel_name, tracefile_size, tracefile_count,
+ &stream_chunk_id);
path_name = NULL;
channel_name = NULL;
goto end_stream_unlock;
}
- stream->chunk_id = stream_info.new_chunk_id;
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
if (stream->is_metadata) {
/*
chunk_id = be64toh(msg.chunk_id);
- DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+ DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64,
+ session->session_name, chunk_id);
/*
* Iterate over all the streams in the session and check if they are
rotate_pending = true;
DBG("Stream %" PRIu64 " is still rotating",
stream->stream_handle);
- } else if (stream->chunk_id < chunk_id) {
+ } else if (stream->current_chunk_id.value < chunk_id) {
/*
* Stream closed on the consumer but still active on the
* relay.
return ret;
}
-static int relay_process_control_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_payload(
+ struct relay_connection *conn)
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
struct ctrl_connection_state_receive_payload *state =
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive command payload on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive command payload on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
ret = relay_process_control_command(conn,
&state->header, &payload_view);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = connection_reset_protocol_state(conn);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
end:
- return ret;
+ return status;
}
-static int relay_process_control_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_header(
+ struct relay_connection *conn)
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttcomm_relayd_hdr header;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive control command header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive control command header on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
header.data_size);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = lttng_dynamic_buffer_set_size(reception_buffer,
header.data_size);
if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
* Manually invoke the next state as the poll loop
* will not wake-up to allow us to proceed further.
*/
- ret = relay_process_control_receive_payload(conn);
+ status = relay_process_control_receive_payload(conn);
}
end:
- return ret;
+ return status;
}
/*
* Process the commands received on the control socket
*/
-static int relay_process_control(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control(
+ struct relay_connection *conn)
{
- int ret = 0;
+ enum relay_connection_status status;
switch (conn->protocol.ctrl.state_id) {
case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
- ret = relay_process_control_receive_header(conn);
+ status = relay_process_control_receive_header(conn);
break;
case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
- ret = relay_process_control_receive_payload(conn);
+ status = relay_process_control_receive_payload(conn);
break;
default:
ERR("Unknown control connection protocol state encountered.");
abort();
}
- return ret;
+ return status;
}
/*
return ret;
}
-static int relay_process_data_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_header(
+ struct relay_connection *conn)
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct data_connection_state_receive_header *state =
&conn->protocol.data.state.receive_header;
struct lttcomm_relayd_data_hdr header;
state->header_reception_buffer + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive data header on sock %d", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
if (!stream) {
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
header.stream_id);
- ret = 0;
+ /* Protocol error. */
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
&new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Failed to rotate stream output file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
conn->protocol.data.state.receive_payload.rotate_index = true;
}
- ret = 0;
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
- return ret;
+ return status;
}
-static int relay_process_data_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_payload(
+ struct relay_connection *conn)
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct relay_stream *stream;
struct data_connection_state_receive_payload *state =
&conn->protocol.data.state.receive_payload;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->header.net_seq_num,
+ state->received, left_to_receive);
+
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
- DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+ /* Protocol error. */
+ ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
state->header.stream_id);
- ret = 0;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
pthread_mutex_lock(&stream->lock);
session = stream->trace->session;
-
- DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
- state->header.stream_id, state->header.net_seq_num,
- state->received, left_to_receive);
+ if (!conn->session) {
+ ret = connection_set_session(conn, session);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
/*
* The size of the "chunk" received on any iteration is bounded by:
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Socket %d error %d", conn->sock->fd, ret);
- ret = -1;
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Socket %d error", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end_stream_unlock;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
break;
} else if (ret < (int) recv_size) {
/*
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- ret = 0;
goto end_stream_unlock;
}
ret = write_padding_to_file(stream->stream_fd->fd,
state->header.padding_size);
- if (ret < 0) {
+ if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
}
ret = try_rotate_stream(stream);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
stream_put(stream);
end:
- return ret;
+ return status;
}
/*
* relay_process_data: Process the data received on the data socket
*/
-static int relay_process_data(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data(
+ struct relay_connection *conn)
{
- int ret;
+ enum relay_connection_status status;
switch (conn->protocol.data.state_id) {
case DATA_CONNECTION_STATE_RECEIVE_HEADER:
- ret = relay_process_data_receive_header(conn);
+ status = relay_process_data_receive_header(conn);
break;
case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
- ret = relay_process_data_receive_payload(conn);
+ status = relay_process_data_receive_payload(conn);
break;
default:
ERR("Unexpected data connection communication state.");
abort();
}
- return ret;
+ return status;
}
static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
- ret = relay_process_control(ctrl_conn);
- if (ret < 0) {
- /* Clear the connection on error. */
+ enum relay_connection_status status;
+
+ status = relay_process_control(ctrl_conn);
+ if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the control connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(ctrl_conn->session);
+ }
+
+ /* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
ctrl_conn);
assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
- ret = relay_process_data(data_conn);
- /* Connection closed */
- if (ret < 0) {
+ enum relay_connection_status status;
+
+ status = relay_process_data(data_conn);
+ /* Connection closed or error. */
+ if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the data connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(data_conn->session);
+ }
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
exit:
error:
- /* Cleanup reamaining connection object. */
+ /* Cleanup remaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
destroy_conn,
sock_n.node) {
health_code_update();
- if (session_abort(destroy_conn->session)) {
- assert(0);
- }
+ session_abort(destroy_conn->session);
/*
* No need to grab another ref, because we own