call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ destroy_relayd(relayd);
+ }
+
+ lttng_ht_destroy(consumer_data.relayd_ht);
+
+ rcu_read_unlock();
+}
+
/*
* Update the end point status of all streams having the given network sequence
* index (relayd index).
int ret;
struct lttng_ht_iter iter;
+ DBG("Consumer delete channel key %d", channel->key);
+
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
}
/*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
*/
void lttng_consumer_cleanup(void)
{
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
}
/*
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
}
rcu_read_unlock();
*/
static void destroy_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
}
rcu_read_unlock();
rcu_register_thread();
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!metadata_ht) {
+ /* ENOMEM at this point. Better to bail out. */
+ goto error;
+ }
+
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
DBG("Metadata poll thread exiting");
lttng_poll_clean(&events);
- if (metadata_ht) {
- destroy_stream_ht(metadata_ht);
- }
+ destroy_stream_ht(metadata_ht);
rcu_unregister_thread();
return NULL;
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (data_ht == NULL) {
+ /* ENOMEM at this point. Better to bail out. */
goto end;
}
PERROR("close data pipe");
}
- if (data_ht) {
- destroy_data_stream_ht(data_ht);
- }
+ destroy_data_stream_ht(data_ht);
rcu_unregister_thread();
return NULL;
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock, client_socket, ret;
+ int sock = -1, client_socket, ret;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
goto end;
}
+ /* This socket is not useful anymore. */
+ ret = close(client_socket);
+ if (ret < 0) {
+ PERROR("close client_socket");
+ }
+ client_socket = -1;
+
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ /* Cleaning up possibly open sockets. */
+ if (sock >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close sock sessiond poll");
+ }
+ }
+ if (client_socket >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close client_socket sessiond poll");
+ }
+ }
+
rcu_unregister_thread();
return NULL;
}
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(metadata_ht);
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(data_ht);
}
/*
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
- int fd, ret = -1;
+ int fd = -1, ret = -1;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
+ fd = -1; /* Just in case it gets set with an invalid value. */
goto error;
}
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
- if (ret < 0) {
- goto error;
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->control_sock.fd >= 0) {
+ if (close(relayd->control_sock.fd)) {
+ PERROR("close relayd control socket");
+ }
}
-
- /* Close the created socket fd which is useless */
- ret = close(relayd->control_sock.fd);
+ /* Handle create_sock error. */
if (ret < 0) {
- PERROR("close relayd control socket");
+ goto error;
}
/* Assign new file descriptor */
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
- if (ret < 0) {
- goto error;
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->data_sock.fd >= 0) {
+ if (close(relayd->data_sock.fd)) {
+ PERROR("close relayd data socket");
+ }
}
-
- /* Close the created socket fd which is useless */
- ret = close(relayd->data_sock.fd);
+ /* Handle create_sock error. */
if (ret < 0) {
- PERROR("close relayd control socket");
+ goto error;
}
/* Assign new file descriptor */
add_relayd(relayd);
/* All good! */
- ret = 0;
+ return 0;
error:
+ /* Close received socket if valid. */
+ if (fd >= 0) {
+ if (close(fd)) {
+ PERROR("close received socket");
+ }
+ }
return ret;
}