relayd_rotate_stream basics in place
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index dca753ba218ed02f5254ad2a74b2b2ad5872277f..b99ecb9824711d7a0fb563022dbd283acb5939f3 100644 (file)
@@ -2109,6 +2109,121 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_rotate_stream: rotate a stream to a new tracefile.
+ */
+static int relay_rotate_stream(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret, send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_stream stream_info;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       size_t len;
+
+       DBG("Rotate stream received");
+
+       if (!session || conn->version_check_done == 0) {
+               ERR("Trying to rotate a stream before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+                       sizeof(stream_info), 0);
+       if (ret < sizeof(stream_info)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_stream struct size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       stream = stream_get_by_id(be64toh(stream_info.stream_id));
+       if (!stream) {
+               ret = -1;
+               goto end;
+       }
+
+       len = lttng_strnlen(stream_info.new_pathname,
+                       sizeof(stream_info.new_pathname));
+       /* Ensure that NULL-terminated and fits in local filename length. */
+       if (len == sizeof(stream_info.new_pathname) || len >= LTTNG_NAME_MAX) {
+               ret = -ENAMETOOLONG;
+               ERR("Path name too long");
+               goto end;
+       }
+
+       fprintf(stderr, "Rotating stream %lu to %s\n", stream_info.stream_id,
+                       stream_info.new_pathname);
+#if 0
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
+       pthread_mutex_lock(&stream->lock);
+       stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
+       if (stream->is_metadata) {
+               struct relay_viewer_stream *vstream;
+
+               vstream = viewer_stream_get_by_id(stream->stream_handle);
+               if (vstream) {
+                       if (vstream->metadata_sent == stream->metadata_received) {
+                               /*
+                                * Since all the metadata has been sent to the
+                                * viewer and that we have a request to close
+                                * its stream, we can safely teardown the
+                                * corresponding metadata viewer stream.
+                                */
+                               viewer_stream_put(vstream);
+                       }
+                       /* Put local reference. */
+                       viewer_stream_put(vstream);
+               }
+       }
+#endif
+       stream_put(stream);
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < 0) {
+               ERR("Relay sending stream id");
+               ret = send_ret;
+       }
+
+end_no_session:
+       return ret;
+}
+
+
 /*
  * Process the commands received on the control socket
  */
@@ -2157,6 +2272,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_STREAM:
+               ret = relay_rotate_stream(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
This page took 0.026477 seconds and 5 git commands to generate.