start to handle async rotation on the relay + basics for remote rotate_pending
[deliverable/lttng-tools.git] / src / bin / lttng-relayd / main.c
index c1b044f5786ae9c98f777c0420ab502bcb371b3e..c6d61a186f095f339193a509c701f67e56d73bbf 100644 (file)
@@ -1507,6 +1507,100 @@ end:
        return ret;
 }
 
+static
+int rotate_index_file(struct relay_stream *stream)
+{
+       int ret;
+       uint32_t major, minor;
+
+       /* Put ref on previous index_file. */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       major = stream->trace->session->major;
+       minor = stream->trace->session->minor;
+       stream->index_file = lttng_index_file_create(stream->path_name,
+                       stream->channel_name,
+                       -1, -1, stream->tracefile_size,
+                       tracefile_array_get_file_index_head(stream->tfa),
+                       lttng_to_index_major(major, minor),
+                       lttng_to_index_minor(major, minor));
+       if (!stream->index_file) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int check_rotate_stream(struct relay_stream *stream)
+{
+       int ret;
+
+       /* No rotation expected */
+       if (stream->rotate_at_seq_num == -1ULL) {
+               ret = 0;
+               goto end;
+       }
+
+       if (stream->prev_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " no yet ready for rotation",
+                               stream->stream_handle);
+               fprintf(stderr, "Stream %" PRIu64 " no yet ready for rotation\n",
+                               stream->stream_handle);
+               ret = 0;
+               goto end;
+       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+               DBG("Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating", stream->stream_handle);
+               fprintf(stderr, "Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating\n", stream->stream_handle);
+               /* TODO */
+       } else {
+               DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle);
+               fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle);
+       }
+
+       /* Perform the stream rotation. */
+       ret = utils_rotate_stream_file(stream->path_name,
+                       stream->channel_name, stream->tracefile_size,
+                       stream->tracefile_count, -1,
+                       -1, stream->stream_fd->fd,
+                       NULL, &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Rotating stream output file");
+               goto end;
+       }
+       stream->tracefile_size_current = 0;
+
+       /* Rotate also the index if the stream is not a metadata stream. */
+       if (!stream->is_metadata) {
+               ret = rotate_index_file(stream);
+               if (ret < 0) {
+                       ERR("Failed to rotate index file");
+                       goto end;
+               }
+       }
+
+       stream->rotate_at_seq_num = -1ULL;
+
+end:
+       return ret;
+}
+
 /*
  * relay_recv_metadata: receive the metadata for the session.
  */
@@ -1590,6 +1684,12 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
+       ret = check_rotate_stream(metadata_stream);
+       if (ret < 0) {
+               ERR("Check rotate stream");
+               goto end_put;
+       }
+
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
@@ -2122,36 +2222,6 @@ end_no_session:
        return ret;
 }
 
-static
-int rotate_index_file(struct relay_stream *stream)
-{
-       int ret;
-       uint32_t major, minor;
-
-       /* Put ref on previous index_file. */
-       if (stream->index_file) {
-               lttng_index_file_put(stream->index_file);
-               stream->index_file = NULL;
-       }
-       major = stream->trace->session->major;
-       minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
-                       stream->channel_name,
-                       -1, -1, stream->tracefile_size,
-                       tracefile_array_get_file_index_head(stream->tfa),
-                       lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor));
-       if (!stream->index_file) {
-               ret = -1;
-               goto end;
-       }
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
 /*
  * relay_rotate_stream: rotate a stream to a new tracefile for the session
  * rotation feature (not the tracefile rotation feature).
@@ -2208,8 +2278,10 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
-       fprintf(stderr, "Rotating stream %lu to %s/\n",
-                       be64toh(stream_info.stream_id), stream_info.new_pathname);
+       fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n",
+                       be64toh(stream_info.stream_id), stream_info.new_pathname,
+                       stream->prev_seq, be64toh(stream_info.rotate_at_seq_num));
+       fprintf(stderr, "metadata: %d\n", stream->is_metadata);
 
        pthread_mutex_lock(&stream->lock);
 
@@ -2226,25 +2298,15 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay creating output directory");
                goto end;
        }
-       ret = utils_rotate_stream_file(stream->path_name,
-                       stream->channel_name, stream->tracefile_size,
-                       stream->tracefile_count, -1,
-                       -1, stream->stream_fd->fd,
-                       NULL, &stream->stream_fd->fd);
+       stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+       stream->chunk_id = be64toh(stream_info.new_chunk_id);
+       /* FIXME: YOU ARE HERE
+        * metadata stream does not have a valid seq_num, but is sent on the control connection */
+       ret = check_rotate_stream(stream);
        if (ret < 0) {
-               ERR("Rotating stream output file");
+               ERR("Check rotate stream");
                goto end_stream_unlock;
        }
-       stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end_stream_unlock;
-               }
-       }
 
 end_stream_unlock:
        pthread_mutex_unlock(&stream->lock);
@@ -2643,6 +2705,12 @@ static int relay_process_data(struct relay_connection *conn)
 
        stream->prev_seq = net_seq_num;
 
+       ret = check_rotate_stream(stream);
+       if (ret < 0) {
+               ERR("Check rotate stream");
+               goto end_stream_unlock;
+       }
+
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
This page took 0.027351 seconds and 5 git commands to generate.