X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=f57cfffff57262cbb04ef048fb761e75019eb805;hb=f50f23d9f80ed9fae7fe5c49aee65e813e0031c8;hp=c561b9bd2c935e5e275972aa8c8b9965b950671b;hpb=04bb2b6422d74dd96498fbdda5fce5cc42cb2f4a;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index c561b9bd2..f57cfffff 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -234,6 +234,27 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) call_rcu(&relayd->node.head, consumer_rcu_free_relayd); } +/* + * Iterate over the relayd hash table and destroy each element. Finally, + * destroy the whole hash table. + */ +static void cleanup_relayd_ht(void) +{ + struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; + + rcu_read_lock(); + + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + destroy_relayd(relayd); + } + + lttng_ht_destroy(consumer_data.relayd_ht); + + rcu_read_unlock(); +} + /* * Update the end point status of all streams having the given network sequence * index (relayd index). @@ -1003,8 +1024,8 @@ int lttng_consumer_send_error( } /* - * Close all the tracefiles and stream fds, should be called when all instances - * are destroyed. + * Close all the tracefiles and stream fds and MUST be called when all + * instances are destroyed i.e. when all threads were joined and are ended. */ void lttng_consumer_cleanup(void) { @@ -1023,6 +1044,15 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); lttng_ht_destroy(consumer_data.channel_ht); + + cleanup_relayd_ht(); + + /* + * This HT contains streams that are freed by either the metadata thread or + * the data thread so we do *nothing* on the hash table and simply destroy + * it. + */ + lttng_ht_destroy(consumer_data.stream_list_ht); } /* @@ -2649,10 +2679,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock) { int fd = -1, ret = -1; + enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); if (relayd == NULL) { @@ -2679,6 +2717,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } + /* We have the fds without error. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error; + } + /* Copy socket information and received FD */ switch (sock_type) { case LTTNG_STREAM_CONTROL: @@ -2883,3 +2928,17 @@ data_not_pending: rcu_read_unlock(); return 1; } + +/* + * Send a ret code status message to the sessiond daemon. + * + * Return the sendmsg() return value. + */ +int consumer_send_status_msg(int sock, int ret_code) +{ + struct lttcomm_consumer_status_msg msg; + + msg.ret_code = ret_code; + + return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); +}