Propagate whether a connection was closed cleanly or after an error
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 27 Apr 2018 20:47:04 +0000 (16:47 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 27 Apr 2018 20:48:07 +0000 (16:48 -0400)
This allows a follow-up fix that requires this distinction to
decide whether a session must be closed or aborted.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c

index a312873808d09448bedae6ab9e87344ef0905f5c..09a73e392ada24798e5e1c143000f0572a253c2f 100644 (file)
@@ -83,6 +83,14 @@ NULL
 #endif
 ;
 
+enum relay_connection_status {
+       RELAY_CONNECTION_STATUS_OK,
+       /* An error occured while processing an event on the connection. */
+       RELAY_CONNECTION_STATUS_ERROR,
+       /* Connection closed/shutdown cleanly. */
+       RELAY_CONNECTION_STATUS_CLOSED,
+};
+
 /* command line options */
 char *opt_output_path;
 static int opt_daemon, opt_background;
@@ -2949,9 +2957,11 @@ end:
        return ret;
 }
 
-static int relay_process_control_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_payload(
+               struct relay_connection *conn)
 {
        int ret = 0;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct lttng_dynamic_buffer *reception_buffer =
                        &conn->protocol.ctrl.reception_buffer;
        struct ctrl_connection_state_receive_payload *state =
@@ -2967,11 +2977,15 @@ static int relay_process_control_receive_payload(struct relay_connection *conn)
                        reception_buffer->data + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive command payload on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive command payload on sock %d",
+                                       conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -2989,7 +3003,6 @@ static int relay_process_control_receive_payload(struct relay_connection *conn)
                DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3008,17 +3021,23 @@ reception_complete:
        ret = relay_process_control_command(conn,
                        &state->header, &payload_view);
        if (ret < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
        ret = connection_reset_protocol_state(conn);
+       if (ret) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
+       }
 end:
-       return ret;
+       return status;
 }
 
-static int relay_process_control_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_header(
+               struct relay_connection *conn)
 {
        int ret = 0;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct lttcomm_relayd_hdr header;
        struct lttng_dynamic_buffer *reception_buffer =
                        &conn->protocol.ctrl.reception_buffer;
@@ -3031,11 +3050,15 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                        reception_buffer->data + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive control command header on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive control command header on sock %d",
+                                       conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -3053,7 +3076,6 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
                                state->received, state->left_to_receive,
                                conn->sock->fd);
-               ret = 0;
                goto end;
        }
 
@@ -3075,7 +3097,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
        if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
                ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
                                header.data_size);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3085,6 +3107,7 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
        ret = lttng_dynamic_buffer_set_size(reception_buffer,
                        header.data_size);
        if (ret) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3093,32 +3116,33 @@ static int relay_process_control_receive_header(struct relay_connection *conn)
                 * Manually invoke the next state as the poll loop
                 * will not wake-up to allow us to proceed further.
                 */
-               ret = relay_process_control_receive_payload(conn);
+               status = relay_process_control_receive_payload(conn);
        }
 end:
-       return ret;
+       return status;
 }
 
 /*
  * Process the commands received on the control socket
  */
-static int relay_process_control(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control(
+               struct relay_connection *conn)
 {
-       int ret = 0;
+       enum relay_connection_status status;
 
        switch (conn->protocol.ctrl.state_id) {
        case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
-               ret = relay_process_control_receive_header(conn);
+               status = relay_process_control_receive_header(conn);
                break;
        case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
-               ret = relay_process_control_receive_payload(conn);
+               status = relay_process_control_receive_payload(conn);
                break;
        default:
                ERR("Unknown control connection protocol state encountered.");
                abort();
        }
 
-       return ret;
+       return status;
 }
 
 /*
@@ -3190,9 +3214,11 @@ end:
        return ret;
 }
 
-static int relay_process_data_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_header(
+               struct relay_connection *conn)
 {
        int ret;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct data_connection_state_receive_header *state =
                        &conn->protocol.data.state.receive_header;
        struct lttcomm_relayd_data_hdr header;
@@ -3204,12 +3230,15 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                        state->header_reception_buffer + state->received,
                        state->left_to_receive, MSG_DONTWAIT);
        if (ret < 0) {
-               ERR("Unable to receive data header on sock %d", conn->sock->fd);
+               if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                       PERROR("Unable to receive data header on sock %d", conn->sock->fd);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+               }
                goto end;
        } else if (ret == 0) {
                /* Orderly shutdown. Not necessary to print an error. */
                DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
-               ret = -1;
+               status = RELAY_CONNECTION_STATUS_CLOSED;
                goto end;
        }
 
@@ -3256,7 +3285,8 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
        if (!stream) {
                DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
                                header.stream_id);
-               ret = 0;
+               /* Protocol error. */
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3281,6 +3311,7 @@ static int relay_process_data_receive_header(struct relay_connection *conn)
                                &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Failed to rotate stream output file");
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
 
@@ -3297,12 +3328,14 @@ end_stream_unlock:
        pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 end:
-       return ret;
+       return status;
 }
 
-static int relay_process_data_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_payload(
+               struct relay_connection *conn)
 {
        int ret;
+       enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
        struct relay_stream *stream;
        struct data_connection_state_receive_payload *state =
                        &conn->protocol.data.state.receive_payload;
@@ -3315,9 +3348,10 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
 
        stream = stream_get_by_id(state->header.stream_id);
        if (!stream) {
+               /* Protocol error. */
                DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
                                state->header.stream_id);
-               ret = 0;
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
        }
 
@@ -3341,13 +3375,16 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
                                recv_size, MSG_DONTWAIT);
                if (ret < 0) {
-                       ERR("Socket %d error %d", conn->sock->fd, ret);
-                       ret = -1;
+                       if (errno != EAGAIN && errno != EWOULDBLOCK) {
+                               PERROR("Socket %d error", conn->sock->fd);
+                               status = RELAY_CONNECTION_STATUS_ERROR;
+                       }
                        goto end_stream_unlock;
                } else if (ret == 0) {
                        /* No more data ready to be consumed on socket. */
                        DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
                                        state->header.stream_id);
+                       status = RELAY_CONNECTION_STATUS_CLOSED;
                        break;
                } else if (ret < (int) recv_size) {
                        /*
@@ -3364,7 +3401,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                                recv_size);
                if (write_ret < (ssize_t) recv_size) {
                        ERR("Relay error writing data to file");
-                       ret = -1;
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
 
@@ -3384,7 +3421,6 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
                                state->header.stream_id, state->received,
                                state->left_to_receive);
-               ret = 0;
                goto end_stream_unlock;
        }
 
@@ -3394,6 +3430,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                stream->stream_handle,
                                state->header.net_seq_num, ret);
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
@@ -3405,6 +3442,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
                        ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
+                       status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                }
        }
@@ -3432,6 +3470,7 @@ static int relay_process_data_receive_payload(struct relay_connection *conn)
 
        ret = try_rotate_stream(stream);
        if (ret < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
@@ -3450,29 +3489,30 @@ end_stream_unlock:
 
        stream_put(stream);
 end:
-       return ret;
+       return status;
 }
 
 /*
  * relay_process_data: Process the data received on the data socket
  */
-static int relay_process_data(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data(
+               struct relay_connection *conn)
 {
-       int ret;
+       enum relay_connection_status status;
 
        switch (conn->protocol.data.state_id) {
        case DATA_CONNECTION_STATE_RECEIVE_HEADER:
-               ret = relay_process_data_receive_header(conn);
+               status = relay_process_data_receive_header(conn);
                break;
        case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
-               ret = relay_process_data_receive_payload(conn);
+               status = relay_process_data_receive_payload(conn);
                break;
        default:
                ERR("Unexpected data connection communication state.");
                abort();
        }
 
-       return ret;
+       return status;
 }
 
 static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
@@ -3644,9 +3684,11 @@ restart:
                                assert(ctrl_conn->type == RELAY_CONTROL);
 
                                if (revents & LPOLLIN) {
-                                       ret = relay_process_control(ctrl_conn);
-                                       if (ret < 0) {
-                                               /* Clear the connection on error. */
+                                       enum relay_connection_status status;
+
+                                       status = relay_process_control(ctrl_conn);
+                                       if (status != RELAY_CONNECTION_STATUS_OK) {
+                                               /* Clear the connection on error or close. */
                                                relay_thread_close_connection(&events,
                                                                pollfd,
                                                                ctrl_conn);
@@ -3720,9 +3762,11 @@ restart:
                        assert(data_conn->type == RELAY_DATA);
 
                        if (revents & LPOLLIN) {
-                               ret = relay_process_data(data_conn);
-                               /* Connection closed */
-                               if (ret < 0) {
+                               enum relay_connection_status status;
+
+                               status = relay_process_data(data_conn);
+                               /* Connection closed or error. */
+                               if (status != RELAY_CONNECTION_STATUS_OK) {
                                        relay_thread_close_connection(&events, pollfd,
                                                        data_conn);
                                        /*
This page took 0.033598 seconds and 5 git commands to generate.