return ret;
}
+static
+int rotate_index_file(struct relay_stream *stream)
+{
+ int ret;
+ uint32_t major, minor;
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+ stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->channel_name,
+ -1, -1, stream->tracefile_size,
+ tracefile_array_get_file_index_head(stream->tfa),
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor));
+ if (!stream->index_file) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int check_rotate_stream(struct relay_stream *stream)
+{
+ int ret;
+
+ /* No rotation expected */
+ if (stream->rotate_at_seq_num == -1ULL) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->prev_seq < stream->rotate_at_seq_num) {
+ DBG("Stream %" PRIu64 " no yet ready for rotation",
+ stream->stream_handle);
+ fprintf(stderr, "Stream %" PRIu64 " no yet ready for rotation\n",
+ stream->stream_handle);
+ ret = 0;
+ goto end;
+ } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+ DBG("Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating", stream->stream_handle);
+ fprintf(stderr, "Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating\n", stream->stream_handle);
+ /* TODO */
+ } else {
+ DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle);
+ fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle);
+ }
+
+ /* Perform the stream rotation. */
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, -1,
+ -1, stream->stream_fd->fd,
+ NULL, &stream->stream_fd->fd);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end;
+ }
+ stream->tracefile_size_current = 0;
+
+ /* Rotate also the index if the stream is not a metadata stream. */
+ if (!stream->is_metadata) {
+ ret = rotate_index_file(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+ }
+
+ stream->rotate_at_seq_num = -1ULL;
+
+end:
+ return ret;
+}
+
/*
* relay_recv_metadata: receive the metadata for the session.
*/
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
+ ret = check_rotate_stream(metadata_stream);
+ if (ret < 0) {
+ ERR("Check rotate stream");
+ goto end_put;
+ }
+
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
return ret;
}
-static
-int rotate_index_file(struct relay_stream *stream)
-{
- int ret;
- uint32_t major, minor;
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
- if (!stream->index_file) {
- ret = -1;
- goto end;
- }
-
- ret = 0;
-
-end:
- return ret;
-}
-
/*
* relay_rotate_stream: rotate a stream to a new tracefile for the session
* rotation feature (not the tracefile rotation feature).
goto end;
}
- fprintf(stderr, "Rotating stream %lu to %s/\n",
- be64toh(stream_info.stream_id), stream_info.new_pathname);
+ fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n",
+ be64toh(stream_info.stream_id), stream_info.new_pathname,
+ stream->prev_seq, be64toh(stream_info.rotate_at_seq_num));
+ fprintf(stderr, "metadata: %d\n", stream->is_metadata);
pthread_mutex_lock(&stream->lock);
ERR("relay creating output directory");
goto end;
}
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
+ stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+ stream->chunk_id = be64toh(stream_info.new_chunk_id);
+ /* FIXME: YOU ARE HERE
+ * metadata stream does not have a valid seq_num, but is sent on the control connection */
+ ret = check_rotate_stream(stream);
if (ret < 0) {
- ERR("Rotating stream output file");
+ ERR("Check rotate stream");
goto end_stream_unlock;
}
- stream->tracefile_size_current = 0;
-
- /* Rotate also the index if the stream is not a metadata stream. */
- if (!stream->is_metadata) {
- ret = rotate_index_file(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end_stream_unlock;
- }
- }
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream->prev_seq = net_seq_num;
+ ret = check_rotate_stream(stream);
+ if (ret < 0) {
+ ERR("Check rotate stream");
+ goto end_stream_unlock;
+ }
+
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);