X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=1e51547fa2bc928924cf79bbce9fbbd8b2add379;hp=4238d3b53ff217e05613e5ed7aab0aef72f11921;hb=8bb66c3cd60938352927ee865759433387324250;hpb=f7c3ffd79ddcece895eb0de616001d549aced5fc diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 4238d3b53..1e51547fa 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -20,11 +20,12 @@ #define _LGPL_SOURCE #include -#include #include +#include #include -#include +#include #include +#include #include "lttng-relayd.h" #include "index.h" @@ -86,9 +87,9 @@ static int stream_create_data_output_file_from_trace_chunk( struct relay_stream *stream, struct lttng_trace_chunk *trace_chunk, bool force_unlink, - struct stream_fd **out_stream_fd) + struct fs_handle **out_file) { - int ret, fd; + int ret; char stream_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; const int flags = O_RDWR | O_CREAT | O_TRUNC; @@ -127,22 +128,13 @@ static int stream_create_data_output_file_from_trace_chunk( } } - status = lttng_trace_chunk_open_file( - trace_chunk, stream_path, flags, mode, &fd, false); + status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path, + flags, mode, out_file, false); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->channel_name); ret = -1; goto end; } - - *out_stream_fd = stream_fd_create(fd); - if (!*out_stream_fd) { - if (close(ret)) { - PERROR("Error closing stream file descriptor %d", ret); - } - ret = -1; - goto end; - } end: return ret; } @@ -154,16 +146,15 @@ static int stream_rotate_data_file(struct relay_stream *stream) DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64, stream->stream_handle, stream->tracefile_size_current); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } stream->tracefile_wrapped_around = false; stream->tracefile_current_index = 0; if (stream->ongoing_rotation.value.next_trace_chunk) { - struct stream_fd *new_stream_fd = NULL; enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_create_subdirectory( @@ -177,8 +168,7 @@ static int stream_rotate_data_file(struct relay_stream *stream) /* Rotate the data file. */ ret = stream_create_data_output_file_from_trace_chunk(stream, stream->ongoing_rotation.value.next_trace_chunk, - false, &new_stream_fd); - stream->stream_fd = new_stream_fd; + false, &stream->file); if (ret < 0) { ERR("Failed to rotate stream data file"); goto end; @@ -213,7 +203,7 @@ static int rotate_truncate_stream(struct relay_stream *stream) off_t lseek_ret, previous_stream_copy_origin; uint64_t copy_bytes_left, misplaced_data_size; bool acquired_reference; - struct stream_fd *previous_stream_fd = NULL; + struct fs_handle *previous_stream_file = NULL; struct lttng_trace_chunk *previous_chunk = NULL; if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) { @@ -244,9 +234,9 @@ static int rotate_truncate_stream(struct relay_stream *stream) * the orinal stream_fd will be used to copy the "extra" data * to the new file. */ - assert(stream->stream_fd); - previous_stream_fd = stream->stream_fd; - stream->stream_fd = NULL; + assert(stream->file); + previous_stream_file = stream->file; + stream->file = NULL; assert(!stream->is_metadata); assert(stream->tracefile_size_current > @@ -261,13 +251,12 @@ static int rotate_truncate_stream(struct relay_stream *stream) goto end; } - assert(stream->stream_fd); + assert(stream->file); /* * Seek the current tracefile to the position at which the rotation * should have occurred. */ - lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin, - SEEK_SET); + lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET); if (lseek_ret < 0) { PERROR("Failed to seek to offset %" PRIu64 " while copying extra data received before a stream rotation", @@ -283,41 +272,41 @@ static int rotate_truncate_stream(struct relay_stream *stream) const off_t copy_size_this_pass = min_t( off_t, copy_bytes_left, sizeof(copy_buffer)); - io_ret = lttng_read(previous_stream_fd->fd, copy_buffer, + io_ret = fs_handle_read(previous_stream_file, copy_buffer, copy_size_this_pass); if (io_ret < (ssize_t) copy_size_this_pass) { if (io_ret == -1) { PERROR("Failed to read %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - previous_stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } else { ERR("Failed to read %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - previous_stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } ret = -1; goto end; } - io_ret = lttng_write(stream->stream_fd->fd, copy_buffer, - copy_size_this_pass); + io_ret = fs_handle_write( + stream->file, copy_buffer, copy_size_this_pass); if (io_ret < (ssize_t) copy_size_this_pass) { if (io_ret == -1) { PERROR("Failed to write %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - stream->stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } else { ERR("Failed to write %" PRIu64 - " bytes from fd %i in %s(), returned %zi", + " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64, copy_size_this_pass, - stream->stream_fd->fd, - __FUNCTION__, io_ret); + __FUNCTION__, io_ret, + stream->stream_handle); } ret = -1; goto end; @@ -326,7 +315,8 @@ static int rotate_truncate_stream(struct relay_stream *stream) } /* Truncate the file to get rid of the excess data. */ - ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin); + ret = fs_handle_truncate( + previous_stream_file, previous_stream_copy_origin); if (ret) { PERROR("Failed to truncate current stream file to offset %" PRIu64, previous_stream_copy_origin); @@ -349,7 +339,6 @@ static int rotate_truncate_stream(struct relay_stream *stream) ret = 0; end: lttng_trace_chunk_put(previous_chunk); - stream_fd_put(previous_stream_fd); return ret; } @@ -561,7 +550,6 @@ static int stream_set_trace_chunk(struct relay_stream *stream, int ret = 0; enum lttng_trace_chunk_status status; bool acquired_reference; - struct stream_fd *new_stream_fd = NULL; status = lttng_trace_chunk_create_subdirectory(chunk, stream->path_name); @@ -575,13 +563,12 @@ static int stream_set_trace_chunk(struct relay_stream *stream, assert(acquired_reference); stream->trace_chunk = chunk; - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } ret = stream_create_data_output_file_from_trace_chunk(stream, chunk, - false, &new_stream_fd); - stream->stream_fd = new_stream_fd; + false, &stream->file); end: return ret; } @@ -684,9 +671,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace, end: if (ret) { - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } stream_put(stream); stream = NULL; @@ -809,9 +796,9 @@ static void stream_release(struct urcu_ref *ref) stream_unpublish(stream); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } if (stream->index_file) { lttng_index_file_put(stream->index_file); @@ -991,9 +978,9 @@ void try_stream_close(struct relay_stream *stream) */ /* Put stream fd before put chunk. */ - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } if (stream->index_file) { lttng_index_file_put(stream->index_file); @@ -1013,7 +1000,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, ASSERT_LOCKED(stream->lock); - if (!stream->stream_fd || !stream->trace_chunk) { + if (!stream->file || !stream->trace_chunk) { ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s", stream->stream_handle, stream->channel_name); ret = -1; @@ -1047,12 +1034,12 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE); stream->tracefile_current_index = new_file_index; - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + fs_handle_close(stream->file); + stream->file = NULL; } ret = stream_create_data_output_file_from_trace_chunk(stream, - stream->trace_chunk, false, &stream->stream_fd); + stream->trace_chunk, false, &stream->file); if (ret) { ERR("Failed to perform trace file rotation of stream %" PRIu64, stream->stream_handle); @@ -1087,15 +1074,15 @@ int stream_write(struct relay_stream *stream, memset(padding_buffer, 0, min(sizeof(padding_buffer), padding_to_write)); - if (!stream->stream_fd || !stream->trace_chunk) { + if (!stream->file || !stream->trace_chunk) { ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s", stream->stream_handle, stream->channel_name); ret = -1; goto end; } if (packet) { - write_ret = lttng_write(stream->stream_fd->fd, - packet->data, packet->size); + write_ret = fs_handle_write( + stream->file, packet->data, packet->size); if (write_ret != packet->size) { PERROR("Failed to write to stream file of %sstream %" PRIu64, stream->is_metadata ? "metadata " : "", @@ -1109,8 +1096,8 @@ int stream_write(struct relay_stream *stream, const size_t padding_to_write_this_pass = min(padding_to_write, sizeof(padding_buffer)); - write_ret = lttng_write(stream->stream_fd->fd, - padding_buffer, padding_to_write_this_pass); + write_ret = fs_handle_write(stream->file, padding_buffer, + padding_to_write_this_pass); if (write_ret != padding_to_write_this_pass) { PERROR("Failed to write padding to file of %sstream %" PRIu64, stream->is_metadata ? "metadata " : "", @@ -1351,9 +1338,16 @@ int stream_reset_file(struct relay_stream *stream) { ASSERT_LOCKED(stream->lock); - if (stream->stream_fd) { - stream_fd_put(stream->stream_fd); - stream->stream_fd = NULL; + if (stream->file) { + int ret; + + ret = fs_handle_close(stream->file); + if (ret) { + ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64, + stream->channel_name, + stream->stream_handle); + } + stream->file = NULL; } DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, @@ -1366,7 +1360,7 @@ int stream_reset_file(struct relay_stream *stream) stream->pos_after_last_complete_data_index = 0; return stream_create_data_output_file_from_trace_chunk(stream, - stream->trace_chunk, true, &stream->stream_fd); + stream->trace_chunk, true, &stream->file); } void print_relay_streams(void)