X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=5c19fb9747e4e124e393e872b15d042810a7aff5;hp=2f5d6f788f13ff8a3c35d9aec91ba0e15624a7cc;hb=b0d240a2e2204087ff1634f0bd265660c0582f33;hpb=6c1c0768320135c6936c371b09731851b508c023 diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 2f5d6f788..5c19fb974 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,38 +17,44 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include +#include +#include +#include +#include +#include #include "lttng-relayd.h" #include "viewer-stream.h" -static void free_stream(struct relay_viewer_stream *stream) +static void viewer_stream_destroy(struct relay_viewer_stream *vstream) { - assert(stream); - - free(stream->path_name); - free(stream->channel_name); - free(stream); + free(vstream->path_name); + free(vstream->channel_name); + free(vstream); } -static void deferred_free_viewer_stream(struct rcu_head *head) +static void viewer_stream_destroy_rcu(struct rcu_head *head) { - struct relay_viewer_stream *stream = + struct relay_viewer_stream *vstream = caa_container_of(head, struct relay_viewer_stream, rcu_node); - free_stream(stream); + viewer_stream_destroy(vstream); } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) + struct lttng_trace_chunk *viewer_trace_chunk, + enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream; + struct relay_viewer_stream *vstream = NULL; + const bool acquired_reference = lttng_trace_chunk_get( + viewer_trace_chunk); - assert(stream); - assert(ctf_trace); + if (!acquired_reference) { + goto error; + } vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -55,221 +62,346 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->session_id = stream->session_id; - vstream->stream_handle = stream->stream_handle; - vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); - vstream->channel_name = strndup(stream->channel_name, + vstream->stream_file.trace_chunk = viewer_trace_chunk; + viewer_trace_chunk = NULL; + vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); + if (vstream->path_name == NULL) { + PERROR("relay viewer path_name alloc"); + goto error; + } + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); - vstream->tracefile_count = stream->tracefile_count; - vstream->metadata_flag = stream->metadata_flag; - vstream->tracefile_count_last = -1ULL; + if (vstream->channel_name == NULL) { + PERROR("relay viewer channel_name alloc"); + goto error; + } + + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; + } + vstream->stream = stream; + + pthread_mutex_lock(&stream->lock); + + if (stream->is_metadata && stream->trace->viewer_metadata_stream) { + ERR("Cannot attach viewer metadata stream to trace (busy)."); + goto error_unlock; + } switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->tracefile_count_current = stream->oldest_tracefile_id; + { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + if (seq_tail == -1ULL) { + /* + * Tail may not be initialized yet. Nonetheless, we know + * we want to send the first index once it becomes + * available. + */ + seq_tail = 0; + } + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; break; + } case LTTNG_VIEWER_SEEK_LAST: - vstream->tracefile_count_current = stream->tracefile_count_current; + vstream->current_tracefile_id = + tracefile_array_get_read_file_index_head(stream->tfa); + /* + * We seek at the very end of each stream, awaiting for + * a future packet to eventually come in. + * + * We don't need to check the head position for -1ULL since the + * increment will set it to 0. + */ + vstream->index_sent_seqcount = + tracefile_array_get_seq_head(stream->tfa) + 1; break; default: - assert(0); - goto error; - } - - if (vstream->metadata_flag) { - ctf_trace->viewer_metadata_stream = vstream; + goto error_unlock; } - /* Globally visible after the add unique. */ - lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - vstream->index_read_fd = -1; - vstream->read_fd = -1; - /* - * This is to avoid a race between the initialization of this object and - * the close of the given stream. If the stream is unable to find this - * viewer stream when closing, this copy will at least take the latest - * value. We also need that for the seek_last. + * If we never received an index for the current stream, delay + * the opening of the index, otherwise open it right now. */ - vstream->total_index_received = stream->total_index_received; + if (stream->index_file == NULL) { + vstream->index_file = NULL; + } else { + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor), + true, &vstream->index_file); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { + vstream->index_file = NULL; + } else { + goto error_unlock; + } + } + } /* - * If we never received an index for the current stream, delay the opening - * of the index, otherwise open it right now. + * If we never received a data file for the current stream, delay the + * opening, otherwise open it right now. */ - if (vstream->tracefile_count_current == stream->tracefile_count_current - && vstream->total_index_received == 0) { - vstream->index_read_fd = -1; - } else { - int read_fd; + if (stream->stream_fd) { + int fd, ret; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + + ret = utils_stream_file_path(stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); + if (ret < 0) { + goto error_unlock; + } - read_fd = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); - if (read_fd < 0) { - goto error; + status = lttng_trace_chunk_open_file( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fd, true); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + goto error_unlock; + } + vstream->stream_file.fd = stream_fd_create(fd); + if (!vstream->stream_file.fd) { + if (close(fd)) { + PERROR("Failed to close viewer %sfile", + stream->is_metadata ? "metadata " : ""); + } + goto error_unlock; } - vstream->index_read_fd = read_fd; } - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_read_fd, - vstream->total_index_received * sizeof(struct ctf_packet_index), - SEEK_CUR); + lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END); if (lseek_ret < 0) { - goto error; + goto error_unlock; } - vstream->last_sent_index = vstream->total_index_received; } + if (stream->is_metadata) { + rcu_assign_pointer(stream->trace->viewer_metadata_stream, + vstream); + } + pthread_mutex_unlock(&stream->lock); + + /* Globally visible after the add unique. */ + lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); + urcu_ref_init(&vstream->ref); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); return vstream; +error_unlock: + pthread_mutex_unlock(&stream->lock); error: if (vstream) { - free_stream(vstream); + viewer_stream_destroy(vstream); + } + if (viewer_trace_chunk && acquired_reference) { + lttng_trace_chunk_put(viewer_trace_chunk); } return NULL; } -void viewer_stream_delete(struct relay_viewer_stream *stream) +static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) { int ret; struct lttng_ht_iter iter; - iter.iter.node = &stream->stream_n.node; + iter.iter.node = &vstream->stream_n.node; ret = lttng_ht_del(viewer_streams_ht, &iter); assert(!ret); } -void viewer_stream_destroy(struct ctf_trace *ctf_trace, - struct relay_viewer_stream *stream) +static void viewer_stream_release(struct urcu_ref *ref) { - int ret; - - assert(stream); + struct relay_viewer_stream *vstream = caa_container_of(ref, + struct relay_viewer_stream, ref); - if (ctf_trace) { - ctf_trace_put_ref(ctf_trace); + if (vstream->stream->is_metadata) { + rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); } - if (stream->read_fd >= 0) { - ret = close(stream->read_fd); - if (ret < 0) { - PERROR("close read_fd"); - } + viewer_stream_unpublish(vstream); + + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } - if (stream->index_read_fd >= 0) { - ret = close(stream->index_read_fd); - if (ret < 0) { - PERROR("close index_read_fd"); - } + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream) { + stream_put(vstream->stream); + vstream->stream = NULL; } + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + vstream->stream_file.trace_chunk = NULL; + call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); +} - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); +/* Must be called with RCU read-side lock held. */ +bool viewer_stream_get(struct relay_viewer_stream *vstream) +{ + return urcu_ref_get_unless_zero(&vstream->ref); } /* - * Find viewer stream by id. RCU read side lock MUST be acquired. + * Get viewer stream by id. * - * Return stream if found else NULL. + * Return viewer stream if found else NULL. */ -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id) +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_viewer_stream *stream = NULL; + struct relay_viewer_stream *vstream = NULL; + rcu_read_lock(); lttng_ht_lookup(viewer_streams_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { DBG("Relay viewer stream %" PRIu64 " not found", id); goto end; } - stream = caa_container_of(node, struct relay_viewer_stream, stream_n); - + vstream = caa_container_of(node, struct relay_viewer_stream, stream_n); + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } end: - return stream; + rcu_read_unlock(); + return vstream; +} + +void viewer_stream_put(struct relay_viewer_stream *vstream) +{ + rcu_read_lock(); + urcu_ref_put(&vstream->ref, viewer_stream_release); + rcu_read_unlock(); +} + +void viewer_stream_close_files(struct relay_viewer_stream *vstream) +{ + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; + } +} + +void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream) +{ + const struct relay_stream *stream = vstream->stream; + uint64_t seq_tail; + + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); + seq_tail = tracefile_array_get_seq_tail(stream->tfa); + if (seq_tail == -1ULL) { + seq_tail = 0; + } + vstream->index_sent_seqcount = seq_tail; } /* * Rotate a stream to the next tracefile. * - * Must be called with viewer_stream_rotation_lock held. - * Returns 0 on success, 1 on EOF, a negative value on error. + * Must be called with the rstream lock held. + * Returns 0 on success, 1 on EOF. */ -int viewer_stream_rotate(struct relay_viewer_stream *vstream, - struct relay_stream *stream) +int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - uint64_t tracefile_id; - - assert(vstream); - assert(stream); + uint64_t new_id; + const struct relay_stream *stream = vstream->stream; - if (vstream->tracefile_count == 0) { - /* Ignore rotation, there is none to do. */ - ret = 0; + /* Detect the last tracefile to open. */ + if (stream->index_received_seqcount + == vstream->index_sent_seqcount + && stream->trace->session->connection_closed) { + ret = 1; goto end; } - tracefile_id = (vstream->tracefile_count_current + 1) % - vstream->tracefile_count; - - /* Detect the last tracefile to open. */ - if (vstream->tracefile_count_last != -1ULL && - vstream->tracefile_count_last == - vstream->tracefile_count_current) { - ret = 1; + if (stream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; goto end; } /* - * The writer and the reader are not working in the same tracefile, we can - * read up to EOF, we don't care about the total_index_received. + * Try to move to the next file. */ - if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { - vstream->close_write_flag = 1; + new_id = (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, + vstream->index_sent_seqcount)) { + vstream->current_tracefile_id = new_id; } else { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + /* - * We are opening a file that is still open in write, make sure we - * limit our reading to the number of indexes received. + * This can only be reached on overwrite, which implies there + * has been data written at some point, which will have set the + * tail. */ - vstream->close_write_flag = 0; - if (stream->close_flag) { - vstream->total_index_received = stream->total_index_received; - } + assert(seq_tail != -1ULL); + /* + * We need to resync because we lag behind tail. + */ + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; } - vstream->tracefile_count_current = tracefile_id; + viewer_stream_close_files(vstream); + ret = 0; +end: + return ret; +} - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close index file %d", vstream->index_read_fd); - } - vstream->index_read_fd = -1; +void print_viewer_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_viewer_stream *vstream; - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close tracefile %d", vstream->read_fd); + if (!viewer_streams_ht) { + return; } - vstream->read_fd = -1; - pthread_mutex_lock(&vstream->overwrite_lock); - vstream->abort_flag = 0; - pthread_mutex_unlock(&vstream->overwrite_lock); - - ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); - if (ret < 0) { - goto error; + rcu_read_lock(); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, + stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); + viewer_stream_put(vstream); } - vstream->index_read_fd = ret; - - ret = 0; - -end: -error: - return ret; + rcu_read_unlock(); }