goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_fd_rotate(stream->stream_fd,
+ stream->path_name, stream->channel_name, 0, 0, NULL);
if (ret < 0) {
ERR("Failed to rotate metadata file %s of channel %s",
stream->path_name, stream->channel_name);
int ret;
/* Perform the stream rotation. */
- ret = utils_rotate_stream_file(stream->path_name,
+ ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
+ stream->tracefile_count, NULL);
if (ret < 0) {
ERR("Rotating stream output file");
goto end;
static
int rotate_truncate_stream(struct relay_stream *stream)
{
- int ret, new_fd;
+ int ret, old_fd = -1, new_fd = -1;
off_t lseek_ret;
uint64_t diff, pos = 0;
char buf[FILE_COPY_BUFFER_SIZE];
+ struct stream_fd *new_stream_fd = NULL;
assert(!stream->is_metadata);
stream->pos_after_last_complete_data_index;
/* Create the new tracefile. */
- new_fd = utils_create_stream_file(stream->path_name,
- stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
- if (new_fd < 0) {
+ new_stream_fd = stream_fd_create(stream->path_name, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count, NULL);
+ if (new_stream_fd < 0) {
ERR("Failed to create new stream file at path %s for channel %s",
stream->path_name, stream->channel_name);
ret = -1;
goto end;
}
+ old_fd = stream_fd_get_fd(stream->stream_fd);
+ if (old_fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
+ new_fd = stream_fd_get_fd(new_stream_fd);
+ if (new_fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
/*
* Rewind the current tracefile to the position at which the rotation
* should have occured.
*/
- lseek_ret = lseek(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index, SEEK_SET);
+ lseek_ret = lseek(old_fd, stream->pos_after_last_complete_data_index,
+ SEEK_SET);
if (lseek_ret < 0) {
PERROR("seek truncate stream");
ret = -1;
count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
assert(count <= SIZE_MAX);
- io_ret = lttng_read(stream->stream_fd->fd, buf, count);
+ io_ret = lttng_read(old_fd, buf, count);
if (io_ret < (ssize_t) count) {
char error_string[256];
snprintf(error_string, sizeof(error_string),
- "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, stream->stream_fd->fd, io_ret);
+ "Failed to read %" PRIu64 " bytes from in rotate_truncate_stream(), returned %zi",
+ count, io_ret);
if (io_ret == -1) {
PERROR("%s", error_string);
} else {
pos += count;
}
+ stream_fd_put_fd(new_stream_fd);
+ new_fd = -1;
+
/* Truncate the file to get rid of the excess data. */
- ret = ftruncate(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index);
+ ret = ftruncate(old_fd, stream->pos_after_last_complete_data_index);
if (ret) {
PERROR("ftruncate");
goto end;
}
- ret = close(stream->stream_fd->fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto end;
- }
+ stream_fd_put_fd(stream->stream_fd);
+ old_fd = -1;
+
+ /*
+ * Swap the old stream_fd with the new, releasing the stream's
+ * reference.
+ */
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = new_stream_fd;
ret = create_rotate_index_file(stream);
if (ret < 0) {
goto end;
}
- stream->stream_fd->fd = new_fd;
stream->tracefile_size_current = diff;
stream->pos_after_last_complete_data_index = 0;
stream->rotate_at_seq_num = -1ULL;
ret = 0;
end:
+ if (old_fd != -1) {
+ stream_fd_put_fd(stream->stream_fd);
+ }
+ if (new_fd != -1) {
+ stream_fd_put_fd(new_stream_fd);
+ }
+
return ret;
}
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ int metadata_fd = -1;
if (!session) {
ERR("Metadata sent before version check");
pthread_mutex_lock(&metadata_stream->lock);
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
+ metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+ if (metadata_fd < 0) {
+ goto end_put;
+ }
+ size_ret = lttng_write(metadata_fd,
payload->data + sizeof(metadata_payload_header),
metadata_payload_size);
if (size_ret < metadata_payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_fd,
metadata_payload_header.padding_size);
if (size_ret < (int64_t) metadata_payload_header.padding_size) {
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
metadata_stream->metadata_received +=
ret = try_rotate_stream(metadata_stream);
if (ret < 0) {
- goto end_put;
+ goto end_put_fd;
}
+end_put_fd:
+ stream_fd_put_fd(metadata_stream->stream_fd);
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
/* new_id is updated by utils_rotate_stream_file. */
new_id = old_id;
- ret = utils_rotate_stream_file(stream->path_name,
+ ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
+ stream->tracefile_count, &new_id);
if (ret < 0) {
ERR("Failed to rotate stream output file");
status = RELAY_CONNECTION_STATUS_ERROR;
bool new_stream = false, close_requested = false, index_flushed = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ int stream_fd = -1;
DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->header.net_seq_num,
}
}
+ stream_fd = stream_fd_get_fd(stream->stream_fd);
+ if (stream_fd < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
/*
* The size of the "chunk" received on any iteration is bounded by:
* - the data left to receive,
PERROR("Socket %d error", conn->sock->fd);
status = RELAY_CONNECTION_STATUS_ERROR;
}
- goto end_stream_unlock;
+ goto end_put_fd;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
recv_size = ret;
/* Write data to stream output fd. */
- write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ write_ret = lttng_write(stream_fd, data_buffer,
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
left_to_receive -= recv_size;
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- goto end_stream_unlock;
+ goto end_put_fd;
}
- ret = write_padding_to_file(stream->stream_fd->fd,
+ ret = write_padding_to_file(stream_fd,
state->header.padding_size);
if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
}
ret = try_rotate_stream(stream);
if (ret < 0) {
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
-
+end_put_fd:
+ (void) stream_fd_put_fd(stream->stream_fd);
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);