Make stream hash tables global to the consumer
authorDavid Goulet <dgoulet@efficios.com>
Thu, 11 Oct 2012 20:48:57 +0000 (16:48 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 16:49:01 +0000 (12:49 -0400)
The data stream hash table is now global to the consumer and used in the
data thread. The consumer_data stream_ht is no longer used to track the
data streams but instead will be used (and possibly renamed) by the
session daemon poll thread to keep track of streams on a per session id
basis for the upcoming feature that check traced data availability.

For now, in order to avoid mind bugging problems to access the streams,
both hash table are now defined globally (metadata and data). However,
stream update are still done in a single thread. Don't count on this to
be guaranteed in the next commits.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h

index be78e256f489661eda948fc8b143a25b69c23a0c..0b2f07391009f4a317b8f7c876443addad39d660 100644 (file)
@@ -58,6 +58,17 @@ int consumer_poll_timeout = -1;
  */
 volatile int consumer_quit = 0;
 
+/*
+ * The following two hash tables are visible by all threads which are separated
+ * in different source files.
+ *
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht = NULL;
+struct lttng_ht *data_ht = NULL;
+
 /*
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
@@ -433,19 +444,24 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-int consumer_add_stream(struct lttng_consumer_stream *stream)
+static int consumer_add_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
+       assert(ht);
 
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
-       lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+       /* Steal stream identifier to avoid having streams with the same key */
+       consumer_steal_stream_key(stream->key, ht);
+
+       lttng_ht_add_unique_ulong(ht, &stream->node);
 
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -783,9 +799,9 @@ end:
  *
  * Returns the number of fds in the structures.
  */
-int consumer_update_poll_array(
+static int consumer_update_poll_array(
                struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_consumer_stream **local_stream)
+               struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
 {
        int i = 0;
        struct lttng_ht_iter iter;
@@ -793,8 +809,7 @@ int consumer_update_poll_array(
 
        DBG("Updating poll fd array");
        rcu_read_lock();
-       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
-                       node.node) {
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
                        continue;
                }
@@ -1518,6 +1533,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * WARNING: *MUST* be used with data stream only.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               ret = lttng_ht_del(ht, &iter);
+               assert(!ret);
+
+               call_rcu(&stream->node.head, consumer_free_stream);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
 /*
  * Iterate over all streams of the hashtable and free them properly.
  *
@@ -1708,6 +1750,9 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_streams);
        }
 
+       /* Steal stream identifier to avoid having streams with the same key */
+       consumer_steal_stream_key(stream->key, ht);
+
        lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
        rcu_read_unlock();
 
@@ -1726,7 +1771,6 @@ void *consumer_thread_metadata_poll(void *data)
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_ulong *node;
-       struct lttng_ht *metadata_ht = NULL;
        struct lttng_poll_event events;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
@@ -1735,11 +1779,6 @@ void *consumer_thread_metadata_poll(void *data)
 
        DBG("Thread metadata poll started");
 
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       if (metadata_ht == NULL) {
-               goto end;
-       }
-
        /* Size is set to 1 for the consumer_metadata pipe */
        ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
        if (ret < 0) {
@@ -1918,6 +1957,11 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       if (data_ht == NULL) {
+               goto end;
+       }
+
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
@@ -1955,7 +1999,8 @@ void *consumer_thread_data_poll(void *data)
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
-                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+                                       data_ht);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
@@ -2015,7 +2060,7 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = consumer_add_stream(new_stream);
+                       ret = consumer_add_stream(new_stream, data_ht);
                        if (ret) {
                                ERR("Consumer add stream %d failed. Continuing",
                                                new_stream->key);
@@ -2088,22 +2133,19 @@ void *consumer_thread_data_poll(void *data)
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i],
-                                                       consumer_data.stream_ht);
+                                       consumer_del_stream(local_stream[i], data_ht);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i],
-                                                       consumer_data.stream_ht);
+                                       consumer_del_stream(local_stream[i], data_ht);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i],
-                                                       consumer_data.stream_ht);
+                                       consumer_del_stream(local_stream[i], data_ht);
                                        num_hup++;
                                }
                        }
@@ -2131,6 +2173,10 @@ end:
         */
        close(ctx->consumer_metadata_pipe[1]);
 
+       if (data_ht) {
+               destroy_data_stream_ht(data_ht);
+       }
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2299,6 +2345,11 @@ void lttng_consumer_init(void)
        consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       assert(metadata_ht);
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       assert(data_ht);
 }
 
 /*
index 8e5891aef60da7dfa3267ee18006605c6005f5c3..6bce96d96946abf263b9bad0bff0d6ff98d8dc49 100644 (file)
@@ -275,6 +275,10 @@ struct lttng_consumer_global_data {
        struct lttng_ht *relayd_ht;
 };
 
+/* Defined in consumer.c and coupled with explanations */
+extern struct lttng_ht *metadata_ht;
+extern struct lttng_ht *data_ht;
+
 /*
  * Init consumer data structures.
  */
@@ -324,10 +328,6 @@ extern void lttng_consumer_sync_trace_file(
  */
 extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-extern int consumer_update_poll_array(
-               struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_consumer_stream **local_consumer_streams);
-
 extern struct lttng_consumer_stream *consumer_allocate_stream(
                int channel_key, int stream_key,
                int shm_fd, int wait_fd,
@@ -340,7 +340,6 @@ extern struct lttng_consumer_stream *consumer_allocate_stream(
                int net_index,
                int metadata_flag,
                int *alloc_ret);
-extern int consumer_add_stream(struct lttng_consumer_stream *stream);
 extern void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
This page took 0.031118 seconds and 5 git commands to generate.