Fix: channel and stream leak in consumerd
[lttng-tools.git] / src / common / consumer.c
index 990db9e2690eff92ec683b8dd1df1aef65542ab0..116441171e5aa73d198c2bbd90175da0d9fb03fa 100644 (file)
@@ -102,6 +102,7 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
 
        msg.action = action;
        msg.chan = chan;
+       msg.key = key;
        do {
                ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
        } while (ret < 0 && errno == EINTR);
@@ -286,6 +287,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
        struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream, *stmp;
 
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
@@ -296,6 +298,13 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /* Delete streams that might have been left in the stream list. */
+               cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                               send_node) {
+                       cds_list_del(&stream->send_node);
+                       lttng_ustconsumer_del_stream(stream);
+                       free(stream);
+               }
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -889,6 +898,8 @@ end:
 
 /*
  * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
@@ -906,7 +917,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
                /* Channel already exist. Ignore the insertion */
                ERR("Consumer add channel key %" PRIu64 " already exists!",
                        channel->key);
-               ret = -1;
+               ret = -EEXIST;
                goto end;
        }
 
@@ -1959,6 +1970,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
 end:
+       /*
+        * Nullify the stream reference so it is not used after deletion. The
+        * consumer data lock MUST be acquired before being able to check for a
+        * NULL pointer value.
+        */
+       stream->chan->metadata_stream = NULL;
+
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
@@ -2011,9 +2029,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_inc(&relayd->refcount);
        }
 
-       /* Update channel refcount once added without error(s). */
-       uatomic_inc(&stream->chan->refcount);
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -2344,7 +2359,7 @@ void *consumer_thread_data_poll(void *data)
 
                        /* allocate for all fds + 1 for the consumer_data_pipe */
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
-                                       sizeof(struct lttng_consumer_stream));
+                                       sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -2711,24 +2726,51 @@ restart:
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                        chan->wait_fd);
+                                               rcu_read_lock();
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                &chan->wait_fd_node);
+                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
+                                               struct lttng_consumer_stream *stream, *stmp;
+
+                                               rcu_read_lock();
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
+                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
                                                        break;
                                                }
                                                lttng_poll_del(&events, chan->wait_fd);
+                                               iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
                                                consumer_close_channel_streams(chan);
 
+                                               switch (consumer_data.type) {
+                                               case LTTNG_CONSUMER_KERNEL:
+                                                       break;
+                                               case LTTNG_CONSUMER32_UST:
+                                               case LTTNG_CONSUMER64_UST:
+                                                       /* Delete streams that might have been left in the stream list. */
+                                                       cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+                                                                       send_node) {
+                                                               cds_list_del(&stream->send_node);
+                                                               lttng_ustconsumer_del_stream(stream);
+                                                               uatomic_sub(&stream->chan->refcount, 1);
+                                                               assert(&chan->refcount);
+                                                               free(stream);
+                                                       }
+                                                       break;
+                                               default:
+                                                       ERR("Unknown consumer_data type");
+                                                       assert(0);
+                                               }
+
                                                /*
                                                 * Release our own refcount. Force channel deletion even if
                                                 * streams were not initialized.
@@ -2736,6 +2778,7 @@ restart:
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
+                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -2774,6 +2817,7 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
+                               assert(cds_list_empty(&chan->streams.head));
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2862,12 +2906,6 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
-       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
-
        /* prepare the FDs to poll : to client socket and the should_quit pipe */
        consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
        consumer_sockpoll[0].events = POLLIN | POLLPRI;
@@ -2885,11 +2923,6 @@ void *consumer_thread_sessiond_poll(void *data)
                WARN("On accept");
                goto end;
        }
-       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
 
        /*
         * Setup metadata socket which is the second socket connection on the
@@ -2968,7 +3001,7 @@ end:
                }
        }
        if (client_socket >= 0) {
-               ret = close(sock);
+               ret = close(client_socket);
                if (ret < 0) {
                        PERROR("close client_socket sessiond poll");
                }
This page took 0.026292 seconds and 5 git commands to generate.