Consumer hold mutex for add stream
[lttng-tools.git] / src / common / consumer.c
index 24b1f1d9f04bea22eb79c77ebebc2b836bbcebca..f69f0081a9cd77a38798c3dd745cd842cd4f4329 100644 (file)
@@ -554,6 +554,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
@@ -593,6 +594,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        consumer_data.need_update = 1;
 
        rcu_read_unlock();
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -1879,6 +1881,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
@@ -1920,6 +1923,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2051,7 +2055,10 @@ restart:
                                         * since their might be data to consume.
                                         */
                                        lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       close(ctx->consumer_metadata_pipe[0]);
+                                       ret = close(ctx->consumer_metadata_pipe[0]);
+                                       if (ret < 0) {
+                                               PERROR("close metadata pipe");
+                                       }
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        do {
@@ -2415,7 +2422,10 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       close(ctx->consumer_metadata_pipe[1]);
+       ret = close(ctx->consumer_metadata_pipe[1]);
+       if (ret < 0) {
+               PERROR("close data pipe");
+       }
 
        if (data_ht) {
                destroy_data_stream_ht(data_ht);
@@ -2635,7 +2645,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->control_sock.fd);
+               ret = close(relayd->control_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
@@ -2649,7 +2662,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->data_sock.fd);
+               ret = close(relayd->data_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->data_sock.fd = fd;
This page took 0.026485 seconds and 5 git commands to generate.