DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
conn->protocol.data.state.receive_payload.rotate_index = true;
}
- ret = 0;
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->header.net_seq_num,
+ state->received, left_to_receive);
+
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
/* Protocol error. */
- DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+ ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
state->header.stream_id);
status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
pthread_mutex_lock(&stream->lock);
session = stream->trace->session;
-
- DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
- state->header.stream_id, state->header.net_seq_num,
- state->received, left_to_receive);
+ if (!conn->session) {
+ ret = connection_set_session(conn, session);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
/*
* The size of the "chunk" received on any iteration is bounded by:
ret = write_padding_to_file(stream->stream_fd->fd,
state->header.padding_size);
- if (ret < 0) {
+ if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = relay_process_control(ctrl_conn);
if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the control connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(ctrl_conn->session);
+ }
+
/* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
status = relay_process_data(data_conn);
/* Connection closed or error. */
if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the data connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(data_conn->session);
+ }
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
exit:
error:
- /* Cleanup reamaining connection object. */
+ /* Cleanup remaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
destroy_conn,
sock_n.node) {
health_code_update();
- if (session_abort(destroy_conn->session)) {
- assert(0);
- }
+ session_abort(destroy_conn->session);
/*
* No need to grab another ref, because we own