- switch (be32toh(recv_hdr->cmd)) {
- case RELAYD_CREATE_SESSION:
- ret = relay_create_session(recv_hdr, cmd);
- break;
- case RELAYD_ADD_STREAM:
- ret = relay_add_stream(recv_hdr, cmd, streams_ht);
- break;
- case RELAYD_START_DATA:
- ret = relay_start(recv_hdr, cmd);
- break;
- case RELAYD_SEND_METADATA:
- ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
- break;
- case RELAYD_VERSION:
- ret = relay_send_version(recv_hdr, cmd);
- break;
- case RELAYD_CLOSE_STREAM:
- ret = relay_close_stream(recv_hdr, cmd, streams_ht);
- break;
- case RELAYD_DATA_PENDING:
- ret = relay_data_pending(recv_hdr, cmd, streams_ht);
- break;
- case RELAYD_QUIESCENT_CONTROL:
- ret = relay_quiescent_control(recv_hdr, cmd);
- break;
- case RELAYD_BEGIN_DATA_PENDING:
- ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
- break;
+ assert(conn);
+
+ DBG("Relay receiving index");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
+ sizeof(index_info), 0);
+ if (ret < sizeof(index_info)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid index struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ net_seq_num = be64toh(index_info.net_seq_num);
+
+ rcu_read_lock();
+ stream = stream_find_by_id(relay_streams_ht,
+ be64toh(index_info.relay_stream_id));
+ if (!stream) {
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /* Live beacon handling */
+ if (index_info.packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
+ */
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
+ stream->beacon_ts_end = be64toh(index_info.timestamp_end);
+ }
+ ret = 0;
+ goto end_rcu_unlock;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
+ index = relay_index_find(stream->stream_handle, net_seq_num);
+ if (!index) {
+ /* A successful creation will add the object to the HT. */
+ index = relay_index_create(stream->stream_handle, net_seq_num);
+ if (!index) {
+ goto end_rcu_unlock;
+ }
+ index_created = 1;
+ stream->indexes_in_flight++;
+ }
+
+ copy_index_control_data(index, &index_info);
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = be64toh(index_info.stream_id);
+ }
+
+ if (index_created) {
+ /*
+ * Try to add the relay index object to the hash table. If an object
+ * already exist, destroy back the index created, set the data in this
+ * object and write it on disk.
+ */
+ relay_index_add(index, &wr_index);
+ if (wr_index) {
+ copy_index_control_data(wr_index, &index_info);
+ free(index);
+ }
+ } else {
+ /* The index already exists so write it on disk. */
+ wr_index = index;
+ }
+
+ /* Do we have a writable ready index to write on disk. */
+ if (wr_index) {
+ ret = relay_index_write(wr_index->fd, wr_index);
+ if (ret < 0) {
+ goto end_rcu_unlock;
+ }
+ stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending close index id reply");
+ ret = send_ret;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(conn);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!conn->session || conn->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(conn);
+
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ if (conn->session->viewer_refcount) {
+ uatomic_set(&conn->session->new_streams, 1);
+ }
+
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Process the commands received on the control socket
+ */
+static
+int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret = 0;
+
+ switch (be32toh(recv_hdr->cmd)) {
+ case RELAYD_CREATE_SESSION:
+ ret = relay_create_session(recv_hdr, conn);
+ break;
+ case RELAYD_ADD_STREAM:
+ ret = relay_add_stream(recv_hdr, conn);
+ break;
+ case RELAYD_START_DATA:
+ ret = relay_start(recv_hdr, conn);
+ break;
+ case RELAYD_SEND_METADATA:
+ ret = relay_recv_metadata(recv_hdr, conn);
+ break;
+ case RELAYD_VERSION:
+ ret = relay_send_version(recv_hdr, conn);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ ret = relay_close_stream(recv_hdr, conn);
+ break;
+ case RELAYD_DATA_PENDING:
+ ret = relay_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ ret = relay_quiescent_control(recv_hdr, conn);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ ret = relay_begin_data_pending(recv_hdr, conn);
+ break;