const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
ASSERT_LOCKED(stream->lock);
- assert(stream->trace_chunk);
ret = utils_stream_file_path(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_current_index,
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
+
+ /*
+ * If we are closed by an application exiting (per-pid buffers),
+ * we need to put our reference on the stream trace chunk right
+ * away, because otherwise still holding the reference on the
+ * trace chunk could allow a viewer stream (which holds a reference
+ * to the stream) to postpone destroy waiting for the chunk to cease
+ * to exist endlessly until the viewer is detached.
+ */
+
+ /* Put stream fd before put chunk. */
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
pthread_mutex_unlock(&stream->lock);
DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
int ret = 0;
ASSERT_LOCKED(stream->lock);
+
+ if (!stream->stream_fd || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
if (caa_likely(stream->tracefile_size == 0)) {
/* No size limit set; nothing to check. */
goto end;
stream->stream_handle,
stream->tracefile_size_current, packet_size,
stream->tracefile_current_index, new_file_index);
- tracefile_array_file_rotate(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
if (stream->stream_fd) {
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
+ if (!stream->stream_fd || !stream->trace_chunk) {
+ ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+ stream->stream_handle, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
if (packet) {
write_ret = lttng_write(stream->stream_fd->fd,
packet->data, packet->size);
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
*flushed = true;
}
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;