#define _LGPL_SOURCE
#include <common/common.h>
-#include <common/utils.h>
#include <common/defaults.h>
+#include <common/fs-handle.h>
#include <common/sessiond-comm/relayd.h>
-#include <urcu/rculist.h>
+#include <common/utils.h>
#include <sys/stat.h>
+#include <urcu/rculist.h>
#include "lttng-relayd.h"
#include "index.h"
static void stream_complete_rotation(struct relay_stream *stream)
{
DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+ if (stream->ongoing_rotation.value.next_trace_chunk) {
+ tracefile_array_reset(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa,
+ stream->index_received_seqcount);
+ }
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
- struct stream_fd **out_stream_fd)
+ struct fs_handle **out_file)
{
- int ret, fd;
+ int ret;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
}
}
- status = lttng_trace_chunk_open_file(
- trace_chunk, stream_path, flags, mode, &fd);
+ status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
+ flags, mode, out_file, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
-
- *out_stream_fd = stream_fd_create(fd);
- if (!*out_stream_fd) {
- if (close(ret)) {
- PERROR("Error closing stream file descriptor %d", ret);
- }
- ret = -1;
- goto end;
- }
end:
return ret;
}
{
int ret = 0;
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
+ DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+ stream->stream_handle, stream->tracefile_size_current);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
- struct stream_fd *new_stream_fd = NULL;
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
}
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->pos_after_last_complete_data_index = 0;
stream->ongoing_rotation.value.data_rotated = true;
off_t lseek_ret, previous_stream_copy_origin;
uint64_t copy_bytes_left, misplaced_data_size;
bool acquired_reference;
- struct stream_fd *previous_stream_fd = NULL;
+ struct fs_handle *previous_stream_file = NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
* the orinal stream_fd will be used to copy the "extra" data
* to the new file.
*/
- assert(stream->stream_fd);
- previous_stream_fd = stream->stream_fd;
- stream->stream_fd = NULL;
+ assert(stream->file);
+ previous_stream_file = stream->file;
+ stream->file = NULL;
assert(!stream->is_metadata);
assert(stream->tracefile_size_current >
goto end;
}
- assert(stream->stream_fd);
+ assert(stream->file);
/*
* Seek the current tracefile to the position at which the rotation
* should have occurred.
*/
- lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
- SEEK_SET);
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
if (lseek_ret < 0) {
PERROR("Failed to seek to offset %" PRIu64
" while copying extra data received before a stream rotation",
const off_t copy_size_this_pass = min_t(
off_t, copy_bytes_left, sizeof(copy_buffer));
- io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+ io_ret = fs_handle_read(previous_stream_file, copy_buffer,
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
- io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
- copy_size_this_pass);
+ io_ret = fs_handle_write(
+ stream->file, copy_buffer, copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
/* Truncate the file to get rid of the excess data. */
- ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
if (ret) {
PERROR("Failed to truncate current stream file to offset %" PRIu64,
previous_stream_copy_origin);
ret = 0;
end:
lttng_trace_chunk_put(previous_chunk);
- stream_fd_put(previous_stream_fd);
return ret;
}
goto end;
}
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
+ stream->prev_data_seq);
+
if (stream->prev_data_seq == -1ULL ||
stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
stream->prev_data_seq <
ret = -1;
goto end;
}
- stream->index_file = lttng_index_file_create_from_trace_chunk(
+ status = lttng_index_file_create_from_trace_chunk(
chunk, stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_current_index,
lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor), true);
- if (!stream->index_file) {
+ lttng_to_index_minor(major, minor), true,
+ &stream->index_file);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end;
}
goto end;
}
+ DBG("%s: Stream %" PRIu64
+ " (rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+ __func__, stream->stream_handle,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
+
if (!stream->received_packet_seq_num.is_set ||
LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
stream->ongoing_rotation.value.packet_seq_num) {
stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
- ret = create_index_file(stream,
- stream->ongoing_rotation.value.next_trace_chunk);
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
stream->ongoing_rotation.value.index_rotated = true;
/*
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
- struct stream_fd *new_stream_fd = NULL;
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
assert(acquired_reference);
stream->trace_chunk = chunk;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
end:
return ret;
}
end:
if (ret) {
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream_put(stream);
stream = NULL;
stream_unpublish(stream);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
* A metadata stream has no index; consider it already rotated.
*/
stream->ongoing_rotation.value.index_rotated = true;
+ if (next_trace_chunk) {
+ /*
+ * The metadata will be received again in the new chunk.
+ */
+ stream->metadata_received = 0;
+ }
ret = stream_rotate_data_file(stream);
} else {
ret = try_rotate_stream_index(stream);
*/
/* Put stream fd before put chunk. */
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
ASSERT_LOCKED(stream->lock);
- if (!stream->stream_fd || !stream->trace_chunk) {
+ if (!stream->file || !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, false, &stream->stream_fd);
+ stream->trace_chunk, false, &stream->file);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
* Reset current size because we just performed a stream
* rotation.
*/
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
*file_rotated = true;
} else {
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
- if (!stream->stream_fd || !stream->trace_chunk) {
+ if (!stream->file || !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
goto end;
}
if (packet) {
- write_ret = lttng_write(stream->stream_fd->fd,
- packet->data, packet->size);
+ write_ret = fs_handle_write(
+ stream->file, packet->data, packet->size);
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
- write_ret = lttng_write(stream->stream_fd->fd,
- padding_buffer, padding_to_write_this_pass);
+ write_ret = fs_handle_write(stream->file, padding_buffer,
+ padding_to_write_this_pass);
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
}
if (stream->is_metadata) {
- stream->metadata_received += packet ? packet->size : 0;
- stream->metadata_received += padding_len;
+ size_t recv_len;
+
+ recv_len = packet ? packet->size : 0;
+ recv_len += padding_len;
+ stream->metadata_received += recv_len;
+ if (recv_len) {
+ stream->no_new_metadata_notified = false;
+ }
}
DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
ret = relay_index_try_flush(index);
if (ret == 0) {
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
be64toh(index->index_data.packet_seq_num));
ASSERT_LOCKED(stream->lock);
+ DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
/* Live beacon handling */
if (index_info->packet_size == 0) {
DBG("Received live beacon for stream %" PRIu64,
ret = relay_index_try_flush(index);
if (ret == 0) {
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
{
ASSERT_LOCKED(stream->lock);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ int ret;
+
+ ret = fs_handle_close(stream->file);
+ if (ret) {
+ ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+ stream->channel_name,
+ stream->stream_handle);
+ }
+ stream->file = NULL;
}
+ DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+ __func__, stream->stream_handle, stream->tracefile_size_current);
stream->tracefile_size_current = 0;
stream->prev_data_seq = 0;
stream->prev_index_seq = 0;
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, true, &stream->stream_fd);
+ stream->trace_chunk, true, &stream->file);
}
void print_relay_streams(void)