#define _LGPL_SOURCE
#include <common/common.h>
-#include <common/utils.h>
#include <common/defaults.h>
+#include <common/fs-handle.h>
#include <common/sessiond-comm/relayd.h>
-#include <urcu/rculist.h>
+#include <common/utils.h>
#include <sys/stat.h>
+#include <urcu/rculist.h>
#include "lttng-relayd.h"
#include "index.h"
static void stream_complete_rotation(struct relay_stream *stream)
{
DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+ if (stream->ongoing_rotation.value.next_trace_chunk) {
+ tracefile_array_reset(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa,
+ stream->index_received_seqcount);
+ }
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
}
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static int rotate_truncate_stream(struct relay_stream *stream)
-{
- int ret, new_fd;
- off_t lseek_ret;
- uint64_t diff, pos = 0;
- char buf[FILE_IO_STACK_BUFFER_SIZE];
-
- assert(!stream->is_metadata);
-
- assert(stream->tracefile_size_current >
- stream->pos_after_last_complete_data_index);
- diff = stream->tracefile_size_current -
- stream->pos_after_last_complete_data_index;
-
- /* Create the new tracefile. */
- new_fd = utils_create_stream_file(stream->path_name,
- stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
- if (new_fd < 0) {
- ERR("Failed to create new stream file at path %s for channel %s",
- stream->path_name, stream->channel_name);
- ret = -1;
- goto end;
- }
-
- /*
- * Rewind the current tracefile to the position at which the rotation
- * should have occurred.
- */
- lseek_ret = lseek(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index, SEEK_SET);
- if (lseek_ret < 0) {
- PERROR("seek truncate stream");
- ret = -1;
- goto end;
- }
-
- /* Move data from the old file to the new file. */
- while (pos < diff) {
- uint64_t count, bytes_left;
- ssize_t io_ret;
-
- bytes_left = diff - pos;
- count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
- assert(count <= SIZE_MAX);
-
- io_ret = lttng_read(stream->stream_fd->fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, stream->stream_fd->fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- io_ret = lttng_write(new_fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, new_fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- pos += count;
- }
-
- /* Truncate the file to get rid of the excess data. */
- ret = ftruncate(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index);
- if (ret) {
- PERROR("ftruncate");
- goto end;
- }
-
- ret = close(stream->stream_fd->fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto end;
- }
-
- /*
- * Update the offset and FD of all the eventual indexes created by the
- * data connection before the rotation command arrived.
- */
- ret = relay_index_switch_all_files(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end;
- }
-
- stream->stream_fd->fd = new_fd;
- stream->tracefile_size_current = diff;
- stream->pos_after_last_complete_data_index = 0;
- stream_complete_rotation(stream);
-
- ret = 0;
-
-end:
- return ret;
-}
-
static int stream_create_data_output_file_from_trace_chunk(
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
- struct stream_fd **out_stream_fd)
+ struct fs_handle **out_file)
{
- int ret, fd;
+ int ret;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
ASSERT_LOCKED(stream->lock);
- assert(stream->trace_chunk);
ret = utils_stream_file_path(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_current_index,
}
}
- status = lttng_trace_chunk_open_file(
- trace_chunk, stream_path, flags, mode, &fd);
+ status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
+ flags, mode, out_file, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
-
- *out_stream_fd = stream_fd_create(fd);
- if (!*out_stream_fd) {
- if (close(ret)) {
- PERROR("Error closing stream file descriptor %d", ret);
- }
- ret = -1;
- goto end;
- }
end:
return ret;
}
{
int ret = 0;
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
+ DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+ stream->stream_handle, stream->tracefile_size_current);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
- struct stream_fd *new_stream_fd = NULL;
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
}
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->pos_after_last_complete_data_index = 0;
stream->ongoing_rotation.value.data_rotated = true;
return ret;
}
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives.
+ */
+static int rotate_truncate_stream(struct relay_stream *stream)
+{
+ int ret;
+ off_t lseek_ret, previous_stream_copy_origin;
+ uint64_t copy_bytes_left, misplaced_data_size;
+ bool acquired_reference;
+ struct fs_handle *previous_stream_file = NULL;
+ struct lttng_trace_chunk *previous_chunk = NULL;
+
+ if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
+ ERR("Protocol error encoutered in %s(): stream rotation "
+ "sequence number is before the current sequence number "
+ "and the next trace chunk is unset. Honoring this "
+ "rotation command would result in data loss",
+ __FUNCTION__);
+ ret = -1;
+ goto end;
+ }
+
+ ASSERT_LOCKED(stream->lock);
+ /*
+ * Acquire a reference to the current trace chunk to ensure
+ * it is not reclaimed when `stream_rotate_data_file` is called.
+ * Failing to do so would violate the contract of the trace
+ * chunk API as an active file descriptor would outlive the
+ * trace chunk.
+ */
+ acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
+ assert(acquired_reference);
+ previous_chunk = stream->trace_chunk;
+
+ /*
+ * Steal the stream's reference to its stream_fd. A new
+ * stream_fd will be created when the rotation completes and
+ * the orinal stream_fd will be used to copy the "extra" data
+ * to the new file.
+ */
+ assert(stream->file);
+ previous_stream_file = stream->file;
+ stream->file = NULL;
+
+ assert(!stream->is_metadata);
+ assert(stream->tracefile_size_current >
+ stream->pos_after_last_complete_data_index);
+ misplaced_data_size = stream->tracefile_size_current -
+ stream->pos_after_last_complete_data_index;
+ copy_bytes_left = misplaced_data_size;
+ previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
+
+ ret = stream_rotate_data_file(stream);
+ if (ret) {
+ goto end;
+ }
+
+ assert(stream->file);
+ /*
+ * Seek the current tracefile to the position at which the rotation
+ * should have occurred.
+ */
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
+ if (lseek_ret < 0) {
+ PERROR("Failed to seek to offset %" PRIu64
+ " while copying extra data received before a stream rotation",
+ (uint64_t) previous_stream_copy_origin);
+ ret = -1;
+ goto end;
+ }
+
+ /* Move data from the old file to the new file. */
+ while (copy_bytes_left) {
+ ssize_t io_ret;
+ char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
+ const off_t copy_size_this_pass = min_t(
+ off_t, copy_bytes_left, sizeof(copy_buffer));
+
+ io_ret = fs_handle_read(previous_stream_file, copy_buffer,
+ copy_size_this_pass);
+ if (io_ret < (ssize_t) copy_size_this_pass) {
+ if (io_ret == -1) {
+ PERROR("Failed to read %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ } else {
+ ERR("Failed to read %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ io_ret = fs_handle_write(
+ stream->file, copy_buffer, copy_size_this_pass);
+ if (io_ret < (ssize_t) copy_size_this_pass) {
+ if (io_ret == -1) {
+ PERROR("Failed to write %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ } else {
+ ERR("Failed to write %" PRIu64
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
+ copy_size_this_pass,
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
+ }
+ ret = -1;
+ goto end;
+ }
+ copy_bytes_left -= copy_size_this_pass;
+ }
+
+ /* Truncate the file to get rid of the excess data. */
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
+ if (ret) {
+ PERROR("Failed to truncate current stream file to offset %" PRIu64,
+ previous_stream_copy_origin);
+ goto end;
+ }
+
+ /*
+ * Update the offset and FD of all the eventual indexes created by the
+ * data connection before the rotation command arrived.
+ */
+ ret = relay_index_switch_all_files(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+
+ stream->tracefile_size_current = misplaced_data_size;
+ /* Index and data contents are back in sync. */
+ stream->pos_after_last_complete_data_index = 0;
+ ret = 0;
+end:
+ lttng_trace_chunk_put(previous_chunk);
+ return ret;
+}
+
/*
* Check if a stream's data file (as opposed to index) should be rotated
* (for session rotation).
goto end;
}
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+
if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+ stream->prev_data_seq <
+ stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* The next packet that will be written is not part of the next
* chunk yet.
*/
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+ DBG("Stream %" PRIu64 " data not yet ready for rotation "
+ "(rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
", prev_data_seq = %" PRIu64 ")",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
stream->prev_data_seq);
goto end;
- } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
int ret;
uint32_t major, minor;
char *index_subpath = NULL;
+ enum lttng_trace_chunk_status status;
ASSERT_LOCKED(stream->lock);
goto end;
}
- ret = lttng_trace_chunk_create_subdirectory(chunk,
+ status = lttng_trace_chunk_create_subdirectory(chunk,
index_subpath);
free(index_subpath);
- if (ret) {
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
goto end;
}
- stream->index_file = lttng_index_file_create_from_trace_chunk(
+ status = lttng_index_file_create_from_trace_chunk(
chunk, stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_current_index,
lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor), true);
- if (!stream->index_file) {
+ lttng_to_index_minor(major, minor), true,
+ &stream->index_file);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end;
}
goto end;
}
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+
+ if (!stream->received_packet_seq_num.is_set ||
+ LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+ stream->ongoing_rotation.value.packet_seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation "
+ "(rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
- stream->prev_index_seq);
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
goto end;
} else {
- /* The next index belongs to the new trace chunk; rotate. */
- assert(stream->prev_index_seq + 1 ==
- stream->ongoing_rotation.value.seq_num);
+ /*
+ * The next index belongs to the new trace chunk; rotate.
+ * In overwrite mode, the packet seq num may jump over the
+ * rotation position.
+ */
+ assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+ stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
- ret = create_index_file(stream,
- stream->ongoing_rotation.value.next_trace_chunk);
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
stream->ongoing_rotation.value.index_rotated = true;
+ /*
+ * Set the rotation pivot position for the data, now that we have the
+ * net_seq_num matching the packet_seq_num index pivot position.
+ */
+ stream->ongoing_rotation.value.prev_data_net_seq =
+ stream->prev_index_seq;
if (stream->ongoing_rotation.value.data_rotated &&
stream->ongoing_rotation.value.index_rotated) {
/* Rotation completed; reset its state. */
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
- struct stream_fd *new_stream_fd = NULL;
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
assert(acquired_reference);
stream->trace_chunk = chunk;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
end:
return ret;
}
end:
if (ret) {
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream_put(stream);
stream = NULL;
}
- lttng_trace_chunk_put(current_trace_chunk);
+ if (acquired_reference) {
+ lttng_trace_chunk_put(current_trace_chunk);
+ }
return stream;
error_no_alloc:
stream_unpublish(stream);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
{
int ret = 0;
const struct relay_stream_rotation rotation = {
- .seq_num = rotation_sequence_number,
+ .data_rotated = false,
+ .index_rotated = false,
+ .packet_seq_num = rotation_sequence_number,
+ .prev_data_net_seq = -1ULL,
.next_trace_chunk = next_trace_chunk,
};
}
LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
- DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+ DBG("Setting pending rotation: stream_id = %" PRIu64
+ ", rotate_at_packet_seq_num = %" PRIu64,
stream->stream_handle, rotation_sequence_number);
if (stream->is_metadata) {
/*
* A metadata stream has no index; consider it already rotated.
*/
stream->ongoing_rotation.value.index_rotated = true;
+ if (next_trace_chunk) {
+ /*
+ * The metadata will be received again in the new chunk.
+ */
+ stream->metadata_received = 0;
+ }
ret = stream_rotate_data_file(stream);
} else {
- ret = try_rotate_stream_data(stream);
+ ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
- ret = try_rotate_stream_index(stream);
+ ret = try_rotate_stream_data(stream);
if (ret < 0) {
goto end;
}
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
+
+ /*
+ * If we are closed by an application exiting (per-pid buffers),
+ * we need to put our reference on the stream trace chunk right
+ * away, because otherwise still holding the reference on the
+ * trace chunk could allow a viewer stream (which holds a reference
+ * to the stream) to postpone destroy waiting for the chunk to cease
+ * to exist endlessly until the viewer is detached.
+ */
+
+ /* Put stream fd before put chunk. */
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
pthread_mutex_unlock(&stream->lock);
DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
int ret = 0;
ASSERT_LOCKED(stream->lock);
+
+ if (!stream->file || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
if (caa_likely(stream->tracefile_size == 0)) {
/* No size limit set; nothing to check. */
goto end;
}
DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
", current_file_size = %" PRIu64
- ", packet_size = %" PRIu64 ", current_file_index = %" PRIu64
+ ", packet_size = %zu, current_file_index = %" PRIu64
" new_file_index = %" PRIu64,
stream->stream_handle,
stream->tracefile_size_current, packet_size,
stream->tracefile_current_index, new_file_index);
- tracefile_array_file_rotate(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, false, &stream->stream_fd);
+ stream->trace_chunk, false, &stream->file);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
* Reset current size because we just performed a stream
* rotation.
*/
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
*file_rotated = true;
} else {
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
+ if (!stream->file || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
if (packet) {
- write_ret = lttng_write(stream->stream_fd->fd,
- packet->data, packet->size);
+ write_ret = fs_handle_write(
+ stream->file, packet->data, packet->size);
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
- write_ret = lttng_write(stream->stream_fd->fd,
- padding_buffer, padding_to_write_this_pass);
+ write_ret = fs_handle_write(stream->file, padding_buffer,
+ padding_to_write_this_pass);
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
}
if (stream->is_metadata) {
- stream->metadata_received += packet->size + padding_len;
+ size_t recv_len;
+
+ recv_len = packet ? packet->size : 0;
+ recv_len += padding_len;
+ stream->metadata_received += recv_len;
+ if (recv_len) {
+ stream->no_new_metadata_notified = false;
+ }
}
- DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64,
+ DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
stream->is_metadata ? "metadata " : "",
stream->stream_handle,
- packet ? packet->size : 0, padding_len);
+ packet ? packet->size : (size_t) 0, padding_len);
end:
return ret;
}
uint64_t data_offset;
struct relay_index *index;
+ assert(stream->trace_chunk);
ASSERT_LOCKED(stream->lock);
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ be64toh(index->index_data.packet_seq_num));
*flushed = true;
} else if (ret > 0) {
index->total_size = total_size;
stream->prev_data_seq = sequence_number;
ret = try_rotate_stream_data(stream);
- if (ret < 0) {
- goto end;
- }
+
end:
return ret;
}
ASSERT_LOCKED(stream->lock);
+ DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
/* Live beacon handling */
if (index_info->packet_size == 0) {
DBG("Received live beacon for stream %" PRIu64,
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ index_info->packet_seq_num);
ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
{
ASSERT_LOCKED(stream->lock);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ int ret;
+
+ ret = fs_handle_close(stream->file);
+ if (ret) {
+ ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+ stream->channel_name,
+ stream->stream_handle);
+ }
+ stream->file = NULL;
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->prev_data_seq = 0;
stream->prev_index_seq = 0;
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, true, &stream->stream_fd);
+ stream->trace_chunk, true, &stream->file);
}
void print_relay_streams(void)