X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fctf-trace.c;h=27009efc38270bd0e46adc6b046f59134c8b5b12;hp=adfbac3f0b557d7c008e1590b7231eb6318e6105;hb=890d8fe47755c3bad936389cf48ffa141cff41c9;hpb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883 diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c index adfbac3f0..27009efc3 100644 --- a/src/bin/lttng-relayd/ctf-trace.c +++ b/src/bin/lttng-relayd/ctf-trace.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,108 +17,197 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include +#include #include "ctf-trace.h" +#include "lttng-relayd.h" +#include "stream.h" static uint64_t last_relay_ctf_trace_id; +static pthread_mutex_t last_relay_ctf_trace_id_lock = PTHREAD_MUTEX_INITIALIZER; + +static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head) +{ + struct ctf_trace *trace = + caa_container_of(rcu_head, struct ctf_trace, rcu_node); + + free(trace); +} /* - * Try to destroy a ctf_trace object meaning that the refcount is decremented - * and checked if down to 0 which will free it. + * Destroy a ctf trace and all stream contained in it. + * + * MUST be called with the RCU read side lock. */ -void ctf_trace_try_destroy(struct ctf_trace *obj) +void ctf_trace_destroy(struct ctf_trace *trace) +{ + /* + * Getting to this point, every stream referenced by that trace + * have put back their ref since the've been closed by the + * control side. + */ + assert(cds_list_empty(&trace->stream_list)); + session_put(trace->session); + trace->session = NULL; + call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace); +} + +void ctf_trace_release(struct urcu_ref *ref) { - unsigned long ret_ref; + struct ctf_trace *trace = + caa_container_of(ref, struct ctf_trace, ref); + int ret; + struct lttng_ht_iter iter; - if (!obj) { - return; - } + iter.iter.node = &trace->node.node; + ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter); + assert(!ret); + ctf_trace_destroy(trace); +} - ret_ref = uatomic_add_return(&obj->refcount, -1); - assert(ret_ref >= 0); - if (ret_ref == 0) { - DBG("Freeing ctf_trace %" PRIu64, obj->id); - free(obj); +/* + * Should be called with RCU read-side lock held. + */ +bool ctf_trace_get(struct ctf_trace *trace) +{ + bool has_ref = false; + + /* Confirm that the trace refcount has not reached 0. */ + pthread_mutex_lock(&trace->reflock); + if (trace->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&trace->ref); } + pthread_mutex_unlock(&trace->reflock); + + return has_ref; } /* - * Create and return an allocated ctf_trace object. NULL on error. + * Create and return an allocated ctf_trace. NULL on error. + * There is no "open" and "close" for a ctf_trace, but rather just a + * create and refcounting. Whenever all the streams belonging to a trace + * put their reference, its refcount drops to 0. */ -struct ctf_trace *ctf_trace_create(void) +static struct ctf_trace *ctf_trace_create(struct relay_session *session, + char *path_name) { - struct ctf_trace *obj; + struct ctf_trace *trace; - obj = zmalloc(sizeof(*obj)); - if (!obj) { + trace = zmalloc(sizeof(*trace)); + if (!trace) { PERROR("ctf_trace alloc"); goto error; } - obj->id = ++last_relay_ctf_trace_id; - DBG("Created ctf_trace %" PRIu64, obj->id); + if (!session_get(session)) { + ERR("Cannot get session"); + free(trace); + trace = NULL; + goto error; + } + trace->session = session; + + CDS_INIT_LIST_HEAD(&trace->stream_list); + + pthread_mutex_lock(&last_relay_ctf_trace_id_lock); + trace->id = ++last_relay_ctf_trace_id; + pthread_mutex_unlock(&last_relay_ctf_trace_id_lock); + + lttng_ht_node_init_str(&trace->node, path_name); + trace->session = session; + urcu_ref_init(&trace->ref); + pthread_mutex_init(&trace->lock, NULL); + pthread_mutex_init(&trace->reflock, NULL); + pthread_mutex_init(&trace->stream_list_lock, NULL); + lttng_ht_add_str(session->ctf_traces_ht, &trace->node); + + DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name); error: - return obj; + return trace; } /* - * Check if we can assign the ctf_trace id and metadata stream to one or all - * the streams with the same path_name (our unique ID for ctf traces). - * - * The given stream MUST be new and NOT visible (in any hash table). + * Return a ctf_trace if found by id in the given hash table else NULL. + * Hold a reference on the ctf_trace, and must be paired with + * ctf_trace_put(). */ -void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream) +struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session, + char *path_name) { + struct lttng_ht_node_str *node; struct lttng_ht_iter iter; - struct relay_stream *tmp_stream; + struct ctf_trace *trace = NULL; - assert(ht); - assert(stream); + rcu_read_lock(); + lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter); + node = lttng_ht_iter_get_node_str(&iter); + if (!node) { + DBG("CTF Trace path %s not found", path_name); + goto end; + } + trace = caa_container_of(node, struct ctf_trace, node); + if (!ctf_trace_get(trace)) { + trace = NULL; + } +end: + rcu_read_unlock(); + if (!trace) { + /* Try to create */ + trace = ctf_trace_create(session, path_name); + } + return trace; +} + +void ctf_trace_put(struct ctf_trace *trace) +{ + rcu_read_lock(); + pthread_mutex_lock(&trace->reflock); + urcu_ref_put(&trace->ref, ctf_trace_release); + pthread_mutex_unlock(&trace->reflock); + rcu_read_unlock(); +} + +int ctf_trace_close(struct ctf_trace *trace) +{ + struct relay_stream *stream; rcu_read_lock(); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct((void *) stream->path_name, lttng_ht_seed), - ht->match_fct, (void *) stream->path_name, - &iter.iter, tmp_stream, ctf_trace_node.node) { - if (stream->metadata_flag) { - /* - * The new stream is the metadata stream for this trace, - * assign the ctf_trace pointer to all the streams in - * this bucket. - */ - pthread_mutex_lock(&tmp_stream->lock); - tmp_stream->ctf_trace = stream->ctf_trace; - uatomic_inc(&tmp_stream->ctf_trace->refcount); - pthread_mutex_unlock(&tmp_stream->lock); - DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, - tmp_stream->ctf_trace->id, tmp_stream->stream_handle); - } else if (tmp_stream->ctf_trace) { - /* - * The ctf_trace already exists for this bucket, - * just assign the pointer to the new stream and exit. - */ - stream->ctf_trace = tmp_stream->ctf_trace; - uatomic_inc(&stream->ctf_trace->refcount); - DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, - tmp_stream->ctf_trace->id, tmp_stream->stream_handle); - goto end; - } else { - /* - * We don't know yet the ctf_trace ID (no metadata has been added), - * so leave it there until the metadata stream arrives. - */ - goto end; - } + cds_list_for_each_entry_rcu(stream, &trace->stream_list, + stream_node) { + /* + * Close stream since the connection owning the trace is being + * torn down. + */ + try_stream_close(stream); } + rcu_read_unlock(); + /* + * Since all references to the trace are held by its streams, we + * don't need to do any self-ref put. + */ + return 0; +} +struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace) +{ + struct relay_viewer_stream *vstream; + + rcu_read_lock(); + vstream = rcu_dereference(trace->viewer_metadata_stream); + if (!vstream) { + goto end; + } + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } end: rcu_read_unlock(); - return; + return vstream; } -