X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=b033dda94d1f39baf70d52ff50a43a7dc6ed797c;hb=f465e9787f5b70d4a8f3761c60b8d6e4161e9628;hp=335a1cf5f79de497f3002beb3ae0a2a983da355f;hpb=ce3f3ba3aee62c0a317b448c2f19578ab7f057e4;p=deliverable%2Flttng-tools.git diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 335a1cf5f..b033dda94 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -123,13 +123,8 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * No need to use run_as API here because whatever we receive, * the relayd uses its own credentials for the stream files. */ - ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); - if (ret < 0) { - ERR("Create output file"); - goto end; - } - stream->stream_fd = stream_fd_create(ret); + stream->stream_fd = stream_fd_create(stream->path_name, + stream->channel_name, stream->tracefile_size, 0, NULL); if (!stream->stream_fd) { if (close(ret)) { PERROR("Error closing file %d", ret); @@ -306,9 +301,9 @@ static void stream_release(struct urcu_ref *ref) stream_fd_put(stream->stream_fd); stream->stream_fd = NULL; } - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + if (stream->index_file) { + relay_index_file_put(stream->index_file); + stream->index_file = NULL; } if (stream->trace) { ctf_trace_put(stream->trace); @@ -345,7 +340,15 @@ void stream_put(struct relay_stream *stream) void try_stream_close(struct relay_stream *stream) { + bool session_aborted; + struct relay_session *session = stream->trace->session; + DBG("Trying to close stream %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->lock); + session_aborted = session->aborted; + pthread_mutex_unlock(&session->lock); + pthread_mutex_lock(&stream->lock); /* * Can be called concurently by connection close and reception of last @@ -376,6 +379,7 @@ void try_stream_close(struct relay_stream *stream) * a packet. Since those are sent in that order, we take * care of consumerd crashes. */ + DBG("relay_index_close_partial_fd"); relay_index_close_partial_fd(stream); /* * Use the highest net_seq_num we currently have pending @@ -383,11 +387,13 @@ void try_stream_close(struct relay_stream *stream) * at -1ULL if we cannot find any index. */ stream->last_net_seq_num = relay_index_find_last(stream); + DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num); /* Fall-through into the next check. */ } if (stream->last_net_seq_num != -1ULL && - ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0 + && !session_aborted) { /* * Don't close since we still have data pending. This * handles cases where an explicit close command has