+ reply = (typeof(reply)){
+ .generic.ret_code = htobe32((uint32_t)
+ (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)),
+ .trace_chunk_exists = ret == 0 ? chunk_exists : 0,
+ };
+ send_ret = conn->sock->ops->sendmsg(
+ conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+end_no_reply:
+ return ret;
+}
+
+#define DBG_CMD(cmd_name, conn) \
+ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
+
+static int relay_process_control_command(struct relay_connection *conn,
+ const struct lttcomm_relayd_hdr *header,
+ const struct lttng_buffer_view *payload)
+{
+ int ret = 0;
+
+ switch (header->cmd) {
+ case RELAYD_CREATE_SESSION:
+ DBG_CMD("RELAYD_CREATE_SESSION", conn);
+ ret = relay_create_session(header, conn, payload);
+ break;
+ case RELAYD_ADD_STREAM:
+ DBG_CMD("RELAYD_ADD_STREAM", conn);
+ ret = relay_add_stream(header, conn, payload);
+ break;
+ case RELAYD_START_DATA:
+ DBG_CMD("RELAYD_START_DATA", conn);
+ ret = relay_start(header, conn, payload);
+ break;
+ case RELAYD_SEND_METADATA:
+ DBG_CMD("RELAYD_SEND_METADATA", conn);
+ ret = relay_recv_metadata(header, conn, payload);
+ break;
+ case RELAYD_VERSION:
+ DBG_CMD("RELAYD_VERSION", conn);
+ ret = relay_send_version(header, conn, payload);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ DBG_CMD("RELAYD_CLOSE_STREAM", conn);
+ ret = relay_close_stream(header, conn, payload);
+ break;
+ case RELAYD_DATA_PENDING:
+ DBG_CMD("RELAYD_DATA_PENDING", conn);
+ ret = relay_data_pending(header, conn, payload);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn);
+ ret = relay_quiescent_control(header, conn, payload);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn);
+ ret = relay_begin_data_pending(header, conn, payload);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ DBG_CMD("RELAYD_END_DATA_PENDING", conn);
+ ret = relay_end_data_pending(header, conn, payload);
+ break;
+ case RELAYD_SEND_INDEX:
+ DBG_CMD("RELAYD_SEND_INDEX", conn);
+ ret = relay_recv_index(header, conn, payload);
+ break;
+ case RELAYD_STREAMS_SENT:
+ DBG_CMD("RELAYD_STREAMS_SENT", conn);
+ ret = relay_streams_sent(header, conn, payload);
+ break;
+ case RELAYD_RESET_METADATA:
+ DBG_CMD("RELAYD_RESET_METADATA", conn);
+ ret = relay_reset_metadata(header, conn, payload);
+ break;
+ case RELAYD_ROTATE_STREAMS:
+ DBG_CMD("RELAYD_ROTATE_STREAMS", conn);
+ ret = relay_rotate_session_streams(header, conn, payload);
+ break;
+ case RELAYD_CREATE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+ ret = relay_create_trace_chunk(header, conn, payload);
+ break;
+ case RELAYD_CLOSE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+ ret = relay_close_trace_chunk(header, conn, payload);
+ break;
+ case RELAYD_TRACE_CHUNK_EXISTS:
+ DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn);
+ ret = relay_trace_chunk_exists(header, conn, payload);
+ break;
+ case RELAYD_UPDATE_SYNC_INFO:
+ default:
+ ERR("Received unknown command (%u)", header->cmd);
+ relay_unknown_command(conn);
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+static enum relay_connection_status relay_process_control_receive_payload(
+ struct relay_connection *conn)
+{
+ int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
+ struct lttng_dynamic_buffer *reception_buffer =
+ &conn->protocol.ctrl.reception_buffer;
+ struct ctrl_connection_state_receive_payload *state =
+ &conn->protocol.ctrl.state.receive_payload;
+ struct lttng_buffer_view payload_view;
+
+ if (state->left_to_receive == 0) {
+ /* Short-circuit for payload-less commands. */
+ goto reception_complete;
+ }