Add a thread utility class and thread list
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 26 Nov 2018 20:09:10 +0000 (15:09 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 4 Dec 2018 23:12:25 +0000 (18:12 -0500)
As part of the re-work of the order of the teardown of the sessiond's
threads, this utility is introduced to track running threads and unify
the mechanisms through which they are launched and shutdown.

This makes it easier to reason about (and track) the order in which
threads are launched and shutdown.

The lttng_thread class allows threads to be implemented by
defining the following methods:
  - An entry point ("main" function),
  - A shutdown method,
  - A clean-up method.

Since the sessiond's threads use a variety of techniques to initiate
their teardown (through an explicit command in a queue, using the
global "quit" pipe, or through a futex + variable), it is more
practical to let them define a shutdown method which notifies the
thread to shutdown than to impose a standard mechanism.

A clean-up method is meant to work around situations where the
ownership of data structures shared between the main thread and
a worker thread can be ambiguous (mostly in error paths).
The clean-up method is invoked when the last reference to a thread
is released.

While some threads need to be shutdown in a particular order, most of
them can be shutdown in bulk. The lttng_thread utility maintains a
global thread list which allows for a generic path through which
threads can be shutdown using the lttng_thread_list_shutdown_orphans()
function.

The lttng_thread_shutdown() method, in return, allows the user
(most likely the main thread) to explicitly teardown threads which
must be shutdown in a specific order before issuing the bulk
lttng_thread_list_shutdown_orphans() call.

Note that lttng_thread objects are reference counted. The thread
list holds a reference to each thread until it is shutdown. Hence,
it is safe to hold a reference to a thread, invoke its shutdown
method, and then invoke lttng_thread_list_shutdown_orphans().

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/thread.c [new file with mode: 0644]
src/bin/lttng-sessiond/thread.h [new file with mode: 0644]

index cfcdc5e86188dc34702af07973059b3dee8b41cc..6330cdccc55afc82d3e7ae20129f77d128f1adda 100644 (file)
@@ -43,7 +43,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        ready.c \
                        globals.c \
                        thread-utils.c \
-                       process-utils.c
+                       process-utils.c \
+                       thread.c thread.h
 
 if HAVE_LIBLTTNG_UST_CTL
 lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
diff --git a/src/bin/lttng-sessiond/thread.c b/src/bin/lttng-sessiond/thread.c
new file mode 100644 (file)
index 0000000..7e16b69
--- /dev/null
@@ -0,0 +1,212 @@
+/*
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "thread.h"
+#include <urcu/list.h>
+#include <urcu/ref.h>
+#include <pthread.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <common/defaults.h>
+
+static struct thread_list {
+       struct cds_list_head head;
+       pthread_mutex_t lock;
+} thread_list = {
+       .head = CDS_LIST_HEAD_INIT(thread_list.head),
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+
+struct lttng_thread {
+       struct urcu_ref ref;
+       struct cds_list_head node;
+       pthread_t thread;
+       const char *name;
+       /* Main thread function */
+       lttng_thread_entry_point entry;
+       /*
+        * Thread-specific shutdown method. Allows threads to implement their
+        * own shutdown mechanism as some of them use a structured message
+        * passed through a command queue and some rely on a dedicated "quit"
+        * pipe.
+        */
+       lttng_thread_shutdown_cb shutdown;
+       lttng_thread_cleanup_cb cleanup;
+       /* Thread implementation-specific data. */
+       void *data;
+};
+
+static
+void lttng_thread_destroy(struct lttng_thread *thread)
+{
+       if (thread->cleanup) {
+               thread->cleanup(thread->data);
+       }
+       free(thread);
+}
+
+static
+void lttng_thread_release(struct urcu_ref *ref)
+{
+       lttng_thread_destroy(container_of(ref, struct lttng_thread, ref));
+}
+
+static
+void *launch_thread(void *data)
+{
+       void *ret;
+       struct lttng_thread *thread = (struct lttng_thread *) data;
+
+       DBG("Launching \"%s\" thread", thread->name);
+       ret = thread->entry(thread->data);
+       DBG("Thread \"%s\" has returned", thread->name);
+       return ret;
+}
+
+struct lttng_thread *lttng_thread_create(const char *name,
+               lttng_thread_entry_point entry,
+               lttng_thread_shutdown_cb shutdown,
+               lttng_thread_cleanup_cb cleanup,
+               void *thread_data)
+{
+       int ret;
+       struct lttng_thread *thread;
+
+       thread = zmalloc(sizeof(*thread));
+       if (!thread) {
+               goto error;
+       }
+
+       urcu_ref_init(&thread->ref);
+       CDS_INIT_LIST_HEAD(&thread->node);
+       /*
+        * Thread names are assumed to be statically allocated strings.
+        * It is unnecessary to copy this attribute.
+        */
+       thread->name = name;
+       thread->entry = entry;
+       thread->shutdown = shutdown;
+       thread->cleanup = cleanup;
+       thread->data = thread_data;
+
+       pthread_mutex_lock(&thread_list.lock);
+       /*
+        * Add the thread at the head of the list to shutdown threads in the
+        * opposite order of their creation. A reference is taken for the
+        * thread list which will be released on shutdown of the thread.
+        */
+       cds_list_add(&thread->node, &thread_list.head);
+       (void) lttng_thread_get(thread);
+
+       ret = pthread_create(&thread->thread, default_pthread_attr(),
+                       launch_thread, thread);
+       if (ret) {
+               PERROR("Failed to create \"%s\" thread", thread->name);
+               goto error_pthread_create;
+       }
+
+       pthread_mutex_unlock(&thread_list.lock);
+       return thread;
+
+error_pthread_create:
+       cds_list_del(&thread->node);
+       /* Release list reference. */
+       lttng_thread_put(thread);
+       pthread_mutex_unlock(&thread_list.lock);
+error:
+       /* Release initial reference. */
+       lttng_thread_put(thread);
+       return NULL;
+}
+
+bool lttng_thread_get(struct lttng_thread *thread)
+{
+       return urcu_ref_get_unless_zero(&thread->ref);
+}
+
+void lttng_thread_put(struct lttng_thread *thread)
+{
+       assert(thread->ref.refcount);
+       urcu_ref_put(&thread->ref, lttng_thread_release);
+}
+
+const char *lttng_thread_get_name(const struct lttng_thread *thread)
+{
+       return thread->name;
+}
+
+static
+bool _lttng_thread_shutdown(struct lttng_thread *thread)
+{
+       int ret;
+       void *status;
+       bool result = true;
+
+       DBG("Shutting down \"%s\" thread", thread->name);
+       if (thread->shutdown) {
+               result = thread->shutdown(thread->data);
+               if (!result) {
+                       result = false;
+                       goto end;
+               }
+       }
+
+       ret = pthread_join(thread->thread, &status);
+       if (ret) {
+               PERROR("Failed to join \"%s\" thread", thread->name);
+               result = false;
+       }
+       /* Release the list's reference to the thread. */
+       cds_list_del(&thread->node);
+       lttng_thread_put(thread);
+end:
+       return result;
+}
+
+bool lttng_thread_shutdown(struct lttng_thread *thread)
+{
+       bool result;
+
+       pthread_mutex_lock(&thread_list.lock);
+       result = _lttng_thread_shutdown(thread);
+       pthread_mutex_unlock(&thread_list.lock);
+       return result;
+}
+
+void lttng_thread_list_shutdown_orphans(void)
+{
+       struct lttng_thread *thread, *tmp;
+
+       pthread_mutex_lock(&thread_list.lock);
+       cds_list_for_each_entry_safe(thread, tmp, &thread_list.head, node) {
+               bool result;
+               const long ref = uatomic_read(&thread->ref.refcount);
+
+               if (ref != 1) {
+                       /*
+                        * Other external references to the thread exist, skip.
+                        */
+                       continue;
+               }
+
+               result = _lttng_thread_shutdown(thread);
+               if (!result) {
+                       ERR("Failed to shutdown thread \"%s\"", thread->name);
+               }
+       }
+       pthread_mutex_unlock(&thread_list.lock);
+}
diff --git a/src/bin/lttng-sessiond/thread.h b/src/bin/lttng-sessiond/thread.h
new file mode 100644 (file)
index 0000000..3b85d0b
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <stdbool.h>
+
+#ifndef THREAD_H
+#define THREAD_H
+
+struct lttng_thread;
+
+/* Main function of the new thread. */
+typedef void *(*lttng_thread_entry_point)(void *);
+
+/* Callback invoked to initiate the shutdown a thread. */
+typedef bool (*lttng_thread_shutdown_cb)(void *);
+
+/*
+ * Callback invoked to clean-up the thread data.
+ * Invoked when the thread is destroyed to ensure there is no
+ * race between a use by the "thread shutdown callback" and
+ * a use by the thread itself.
+ */
+typedef void (*lttng_thread_cleanup_cb)(void *);
+
+/*
+ * Returns a reference to the newly-created thread.
+ * The shutdown and cleanup callbacks are optional.
+ */
+struct lttng_thread *lttng_thread_create(const char *name,
+               lttng_thread_entry_point entry,
+               lttng_thread_shutdown_cb shutdown,
+               lttng_thread_cleanup_cb cleanup,
+               void *thread_data);
+
+bool lttng_thread_get(struct lttng_thread *thread);
+void lttng_thread_put(struct lttng_thread *thread);
+
+const char *lttng_thread_get_name(const struct lttng_thread *thread);
+
+/*
+ * Explicitly shutdown a thread. This function returns once the
+ * thread has returned and been joined.
+ *
+ * It is invalid to call this function more than once on a thread.
+ *
+ * Returns true on success, false on error.
+ */
+bool lttng_thread_shutdown(struct lttng_thread *thread);
+
+/*
+ * Shutdown all orphaned threads (threads to which no external reference
+ * exist).
+ *
+ * Returns once all orphaned threads have been joined.
+ */
+void lttng_thread_list_shutdown_orphans(void);
+
+#endif /* THREAD_H */
This page took 0.04186 seconds and 5 git commands to generate.