X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fconnection.c;h=fce6c84d58c922f5d26491c62160ba49ab2577ef;hp=4dc41e0e4fe8fe7e693257cb5b00c8748ee61ab3;hb=36d2e35df61339e4394e84ad9790b984d259e0f0;hpb=58eb9381cd933f0644e6a5b89750dbd6d9b14570 diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 4dc41e0e4..fce6c84d5 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -17,95 +18,131 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include +#include #include "connection.h" #include "stream.h" +#include "viewer-session.h" -static void rcu_free_connection(struct rcu_head *head) +bool connection_get(struct relay_connection *conn) { - struct relay_connection *conn = - caa_container_of(head, struct relay_connection, rcu_node); + bool has_ref = false; - lttcomm_destroy_sock(conn->sock); - free(conn); + pthread_mutex_lock(&conn->reflock); + if (conn->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&conn->ref); + } + pthread_mutex_unlock(&conn->reflock); + + return has_ref; } -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock) +struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht, + int sock) { struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct relay_connection *conn = NULL; - assert(ht); assert(sock >= 0); - lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock), + &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (!node) { DBG2("Relay connection by sock %d not found", sock); goto end; } conn = caa_container_of(node, struct relay_connection, sock_n); - + if (!connection_get(conn)) { + conn = NULL; + } end: + rcu_read_unlock(); return conn; } -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn) +struct relay_connection *connection_create(struct lttcomm_sock *sock, + enum connection_type type) { - int ret; - struct lttng_ht_iter iter; - - assert(ht); - assert(conn); + struct relay_connection *conn; - iter.iter.node = &conn->sock_n.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + conn = zmalloc(sizeof(*conn)); + if (!conn) { + PERROR("zmalloc relay connection"); + goto end; + } + pthread_mutex_init(&conn->reflock, NULL); + urcu_ref_init(&conn->ref); + conn->type = type; + conn->sock = sock; + lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); +end: + return conn; } -void connection_destroy(struct relay_connection *conn) +static void rcu_free_connection(struct rcu_head *head) { - struct relay_stream *stream, *tmp_stream; - - assert(conn); + struct relay_connection *conn = + caa_container_of(head, struct relay_connection, rcu_node); - /* Clean up recv list of this connection if any. */ - cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head, - recv_list) { - cds_list_del(&stream->recv_list); + lttcomm_destroy_sock(conn->sock); + if (conn->viewer_session) { + viewer_session_destroy(conn->viewer_session); + conn->viewer_session = NULL; } + free(conn); +} +static void destroy_connection(struct relay_connection *conn) +{ call_rcu(&conn->rcu_node, rcu_free_connection); } -struct relay_connection *connection_create(void) +static void connection_release(struct urcu_ref *ref) { - struct relay_connection *conn; + struct relay_connection *conn = + caa_container_of(ref, struct relay_connection, ref); - conn = zmalloc(sizeof(*conn)); - if (!conn) { - PERROR("zmalloc relay connection"); - goto error; + if (conn->in_socket_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &conn->sock_n.node; + ret = lttng_ht_del(conn->socket_ht, &iter); + assert(!ret); } -error: - return conn; + if (conn->session) { + if (session_close(conn->session)) { + ERR("session_close"); + } + conn->session = NULL; + } + if (conn->viewer_session) { + viewer_session_close(conn->viewer_session); + } + destroy_connection(conn); } -void connection_init(struct relay_connection *conn) +void connection_put(struct relay_connection *conn) { - assert(conn); - assert(conn->sock); - - CDS_INIT_LIST_HEAD(&conn->recv_head); - lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); + rcu_read_lock(); + pthread_mutex_lock(&conn->reflock); + urcu_ref_put(&conn->ref, connection_release); + pthread_mutex_unlock(&conn->reflock); + rcu_read_unlock(); } -void connection_free(struct relay_connection *conn) +void connection_ht_add(struct lttng_ht *relay_connections_ht, + struct relay_connection *conn) { - assert(conn); - - free(conn); + assert(!conn->in_socket_ht); + lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n); + conn->in_socket_ht = 1; + conn->socket_ht = relay_connections_ht; }