Fix: multiple consumer locking problems
authorDavid Goulet <dgoulet@efficios.com>
Wed, 15 Aug 2012 20:41:37 +0000 (16:41 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Wed, 15 Aug 2012 20:51:52 +0000 (16:51 -0400)
First, a lot of rcu_read_unlock() were missing the consumer command
handler which could make a rcu lock and return on error without
unlocking.

Fix goto error path in the consumer.

Fix a missing lock control socket mutex.

Fix memory leaks in a UST session where the consumers output object were
not freed during a destroy command.

Add a relayd sockets sent flag so we don't resend existing sockets to
the consumer during an enable_consumer command.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/trace-ust.c
src/common/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 68a51bb6cb6efc2424b0b2bc7427cbec2925963b..c85529cfde94cf36fb714ac4b67137f72385e69f 100644 (file)
@@ -86,6 +86,9 @@ struct consumer_net {
 
        /* Data path for network streaming. */
        struct lttng_uri data;
+
+       /* Flag if network sockets were sent to the consumer. */
+       unsigned int relayd_socks_sent;
 };
 
 /*
@@ -98,8 +101,8 @@ struct consumer_output {
 
        /*
         * The net_seq_index is the index of the network stream on the consumer
-        * side. It's basically the relayd socket file descriptor value so the
-        * consumer can identify which streams goes with which socket.
+        * side. It tells the consumer which streams goes to which relayd with this
+        * index. The relayd sockets are index with it on the consumer side.
         */
        int net_seq_index;
 
index 1e9c0be9419f1676735e5b1fe93ec8e3ccc26169..92bf85bc6a615ba5aa3face13efe04a534189bb2 100644 (file)
@@ -448,8 +448,8 @@ static void teardown_ust_session(struct ltt_session *session)
         * lock. This means that there CAN NOT be stream(s) being sent to a
         * consumer since this action also requires the session lock at any time.
         *
-        * At this point, we are sure that not streams data will be lost after this
-        * command is issued.
+        * At this point, we are sure that no data will be lost after this command
+        * is issued.
         */
        if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) {
                cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket,
@@ -2097,6 +2097,15 @@ static int send_sockets_relayd_consumer(int domain,
 {
        int ret;
 
+       assert(session);
+       assert(consumer);
+
+       /* Don't resend the sockets to the consumer. */
+       if (consumer->dst.net.relayd_socks_sent) {
+               ret = LTTCOMM_OK;
+               goto error;
+       }
+
        /* Sending control relayd socket. */
        ret = send_socket_relayd_consumer(domain, session,
                        &consumer->dst.net.control, consumer, fd);
@@ -2111,6 +2120,9 @@ static int send_sockets_relayd_consumer(int domain,
                goto error;
        }
 
+       /* Flag that all relayd sockets were sent to the consumer. */
+       consumer->dst.net.relayd_socks_sent = 1;
+
 error:
        return ret;
 }
@@ -3647,7 +3659,9 @@ skip_consumer:
 session_error:
        session_destroy(session);
 error:
+       rcu_read_lock();
        consumer_destroy_output(consumer);
+       rcu_read_unlock();
 consumer_error:
        return ret;
 }
@@ -3949,7 +3963,6 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session,
        case LTTNG_DOMAIN_KERNEL:
                /* Code flow error if we don't have a kernel session here. */
                assert(ksess);
-               assert(ksess->consumer);
 
                /* Create consumer output if none exists */
                consumer = ksess->tmp_consumer;
@@ -3990,8 +4003,12 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session,
                        goto error;
                }
 
-               /* Don't send relayd socket if URI is NOT remote */
-               if (uris[i].dtype == LTTNG_DST_PATH) {
+               /*
+                * Don't send relayd socket if URI is NOT remote or if the relayd
+                * sockets for the session are already sent.
+                */
+               if (uris[i].dtype == LTTNG_DST_PATH ||
+                               consumer->dst.net.relayd_socks_sent) {
                        continue;
                }
 
@@ -4180,7 +4197,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session)
                 * session without this lock hence freeing the consumer output object
                 * is valid.
                 */
+               rcu_read_lock();
                consumer_destroy_output(ksess->consumer);
+               rcu_read_unlock();
                ksess->consumer = consumer;
                ksess->tmp_consumer = NULL;
 
@@ -4263,7 +4282,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session)
                 * session without this lock hence freeing the consumer output object
                 * is valid.
                 */
+               rcu_read_lock();
                consumer_destroy_output(usess->consumer);
+               rcu_read_unlock();
                usess->consumer = consumer;
                usess->tmp_consumer = NULL;
 
index d1e8b8dae117a4369d35681a0e6d7462b0f136c3..3e382b7a5fb738eb11f7fb5041bce65657067249 100644 (file)
@@ -578,6 +578,9 @@ void trace_ust_destroy_session(struct ltt_ust_session *session)
        destroy_domain_pid(session->domain_pid);
        destroy_domain_exec(session->domain_exec);
 
+       consumer_destroy_output(session->consumer);
+       consumer_destroy_output(session->tmp_consumer);
+
        free(session);
 
        rcu_read_unlock();
index deebd2e2b353cd1cc4365f5ede45e78203e9653e..c1dadddb3dd08854bffd8eef8cca3e9a76e82bed 100644 (file)
@@ -274,9 +274,12 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                uatomic_dec(&relayd->refcount);
                assert(uatomic_read(&relayd->refcount) >= 0);
 
+               /* Closing streams requires to lock the control socket. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_send_close_stream(&relayd->control_sock,
                                stream->relayd_stream_id,
                                stream->next_net_seq_num - 1);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        DBG("Unable to close stream on the relayd. Continuing");
                        /*
@@ -438,8 +441,10 @@ end:
 }
 
 /*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
  */
+
 int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret = 0;
@@ -451,20 +456,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
                goto end;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.relayd_ht,
                        (void *)((unsigned long) relayd->net_seq_idx), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
-               rcu_read_unlock();
                /* Relayd already exist. Ignore the insertion */
                goto end;
        }
        lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
 
-       rcu_read_unlock();
-
 end:
        return ret;
 }
index 8c2bee33363614cbf58a4e5082aad84d9b8140ad..f5eb35c31e513e458a69c8d607c26764b916d577 100644 (file)
@@ -123,6 +123,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Poll on consumer socket. */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       rcu_read_unlock();
                        return -EINTR;
                }
 
@@ -214,6 +215,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       rcu_read_unlock();
                        return -EINTR;
                }
 
@@ -221,6 +223,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
                        lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       rcu_read_unlock();
                        return ret;
                }
 
@@ -237,7 +240,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.metadata_flag);
                if (new_stream == NULL) {
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
-                       goto end;
+                       goto end_nosignal;
                }
 
                /* The stream is not metadata. Get relayd reference if exists. */
@@ -250,13 +253,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
-                               goto end;
+                               goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
                        free(new_stream);
-                       goto end;
+                       goto end_nosignal;
                }
 
                if (ctx->on_recv_stream != NULL) {
@@ -264,7 +267,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (ret == 0) {
                                consumer_add_stream(new_stream);
                        } else if (ret < 0) {
-                               goto end;
+                               goto end_nosignal;
                        }
                } else {
                        consumer_add_stream(new_stream);
@@ -275,30 +278,43 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
-               if (ctx->on_update_stream != NULL) {
-                       ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
-                       if (ret == 0) {
-                               consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
-                       } else if (ret < 0) {
-                               goto end;
-                       }
-               } else {
-                       consumer_change_stream_state(msg.u.stream.stream_key,
-                               msg.u.stream.state);
+               rcu_read_unlock();
+               return -ENOSYS;
+       }
+       case LTTNG_CONSUMER_DESTROY_RELAYD:
+       {
+               struct consumer_relayd_sock_pair *relayd;
+
+               DBG("Kernel consumer destroying relayd %zu",
+                               msg.u.destroy_relayd.net_seq_idx);
+
+               /* Get relayd reference if exists. */
+               relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+               if (relayd == NULL) {
+                       ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx);
+                       goto end_nosignal;
                }
-               break;
+
+               /* Set destroy flag for this object */
+               uatomic_set(&relayd->destroy_flag, 1);
+
+               /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+               if (uatomic_read(&relayd->refcount) == 0) {
+                       consumer_destroy_relayd(relayd);
+               }
+               goto end_nosignal;
        }
        default:
-               break;
+               goto end_nosignal;
        }
-end:
+
        /*
-        * Wake-up the other end by writing a null byte in the pipe
-        * (non-blocking). Important note: Because writing into the
-        * pipe is non-blocking (and therefore we allow dropping wakeup
-        * data, as long as there is wakeup data present in the pipe
-        * buffer to wake up the other end), the other end should
-        * perform the following sequence for waiting:
+        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+        * Important note: Because writing into the pipe is non-blocking (and
+        * therefore we allow dropping wakeup data, as long as there is wakeup data
+        * present in the pipe buffer to wake up the other end), the other end
+        * should perform the following sequence for waiting:
+        *
         * 1) empty the pipe (reads).
         * 2) perform update operation.
         * 3) wait on the pipe (poll).
index c92d59d027c0e28c3914094257df5b22d97e59de..486ca26342e57077f4eca2b33e18720932d26686 100644 (file)
@@ -108,7 +108,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
-       /* relayd need RCU read-side lock */
+       /* relayd needs RCU read-side lock */
        rcu_read_lock();
 
        switch (msg.cmd_type) {
@@ -133,6 +133,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Poll on consumer socket. */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       rcu_read_unlock();
                        return -EINTR;
                }
 
@@ -200,11 +201,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       rcu_read_unlock();
                        return -EINTR;
                }
                ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
                if (ret != sizeof(fds)) {
                        lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       rcu_read_unlock();
                        return ret;
                }
 
@@ -241,11 +244,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+                       rcu_read_unlock();
                        return -EINTR;
                }
                ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
                if (ret != sizeof(fds)) {
                        lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+                       rcu_read_unlock();
                        return ret;
                }
 
@@ -267,7 +272,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.metadata_flag);
                if (new_stream == NULL) {
                        lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
-                       goto end;
+                       goto end_nosignal;
                }
 
                /* The stream is not metadata. Get relayd reference if exists. */
@@ -280,13 +285,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
-                               goto end;
+                               goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
                        free(new_stream);
-                       goto end;
+                       goto end_nosignal;
                }
 
                if (ctx->on_recv_stream != NULL) {
@@ -294,7 +299,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (ret == 0) {
                                consumer_add_stream(new_stream);
                        } else if (ret < 0) {
-                               goto end;
+                               goto end_nosignal;
                        }
                } else {
                        consumer_add_stream(new_stream);
@@ -315,8 +320,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Get relayd reference if exists. */
                relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
                if (relayd == NULL) {
-                       ERR("Unable to find relayd %zu",
-                                       msg.u.destroy_relayd.net_seq_idx);
+                       ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx);
+                       goto end_nosignal;
                }
 
                /* Set destroy flag for this object */
@@ -326,10 +331,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (uatomic_read(&relayd->refcount) == 0) {
                        consumer_destroy_relayd(relayd);
                }
-               break;
+               goto end_nosignal;
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
+               rcu_read_unlock();
                return -ENOSYS;
 #if 0
                if (ctx->on_update_stream != NULL) {
@@ -343,20 +349,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_change_stream_state(msg.u.stream.stream_key,
                                msg.u.stream.state);
                }
-#endif
                break;
+#endif
        }
        default:
                break;
        }
-end:
+
        /*
-        * Wake-up the other end by writing a null byte in the pipe
-        * (non-blocking). Important note: Because writing into the
-        * pipe is non-blocking (and therefore we allow dropping wakeup
-        * data, as long as there is wakeup data present in the pipe
-        * buffer to wake up the other end), the other end should
-        * perform the following sequence for waiting:
+        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+        * Important note: Because writing into the pipe is non-blocking (and
+        * therefore we allow dropping wakeup data, as long as there is wakeup data
+        * present in the pipe buffer to wake up the other end), the other end
+        * should perform the following sequence for waiting:
+        *
         * 1) empty the pipe (reads).
         * 2) perform update operation.
         * 3) wait on the pipe (poll).
This page took 0.036177 seconds and 5 git commands to generate.