Fix: miscellaneous memory handling fixes
[lttng-tools.git] / src / common / consumer.c
index ffbecf7163ef394b4ffce3b8bf6874d3b5b4375f..c842de9358965c79e31f489b789906f3d1deddec 100644 (file)
@@ -250,6 +250,32 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        return channel;
 }
 
+/*
+ * There is a possibility that the consumer does not have enough time between
+ * the close of the channel on the session daemon and the cleanup in here thus
+ * once we have a channel add with an existing key, we know for sure that this
+ * channel will eventually get cleaned up by all streams being closed.
+ *
+ * This function just nullifies the already existing channel key.
+ */
+static void steal_channel_key(uint64_t key)
+{
+       struct lttng_consumer_channel *channel;
+
+       rcu_read_lock();
+       channel = consumer_find_channel(key);
+       if (channel) {
+               channel->key = (uint64_t) -1ULL;
+               /*
+                * We don't want the lookup to match, but we still need to iterate on
+                * this channel when iterating over the hash table. Just change the
+                * node key.
+                */
+               channel->node.key = (uint64_t) -1ULL;
+       }
+       rcu_read_unlock();
+}
+
 static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
@@ -979,43 +1005,35 @@ end:
 /*
  * Add a channel to the global list protected by a mutex.
  *
- * On success 0 is returned else a negative value.
+ * Always return 0 indicating success.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
 {
-       int ret = 0;
-       struct lttng_ht_node_u64 *node;
-       struct lttng_ht_iter iter;
-
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->timer_lock);
-       rcu_read_lock();
 
-       lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
-               /* Channel already exist. Ignore the insertion */
-               ERR("Consumer add channel key %" PRIu64 " already exists!",
-                       channel->key);
-               ret = -EEXIST;
-               goto end;
-       }
+       /*
+        * This gives us a guarantee that the channel we are about to add to the
+        * channel hash table will be unique. See this function comment on the why
+        * we need to steel the channel key at this stage.
+        */
+       steal_channel_key(channel->key);
 
+       rcu_read_lock();
        lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
-
-end:
        rcu_read_unlock();
+
        pthread_mutex_unlock(&channel->timer_lock);
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
-       if (!ret && channel->wait_fd != -1 &&
-                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+       if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
-       return ret;
+
+       return 0;
 }
 
 /*
@@ -1683,7 +1701,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
-                       ret = -EPIPE;
+                       written = -ret;
                        goto end;
                }
        }
@@ -1701,7 +1719,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 
        /* Write metadata stream id before payload */
        if (relayd) {
-               int total_len = len;
+               unsigned long total_len = len;
 
                if (stream->metadata_flag) {
                        /*
@@ -1714,31 +1732,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        padding);
                        if (ret < 0) {
                                written = ret;
-                               /* Socket operation failed. We consider the relayd dead */
-                               if (ret == -EBADF) {
-                                       WARN("Remote relayd disconnected. Stopping");
-                                       relayd_hang_up = 1;
-                                       goto write_error;
-                               }
-                               goto end;
+                               relayd_hang_up = 1;
+                               goto write_error;
                        }
 
                        total_len += sizeof(struct lttcomm_relayd_metadata_payload);
                }
 
                ret = write_relayd_stream_header(stream, total_len, padding, relayd);
-               if (ret >= 0) {
-                       /* Use the returned socket. */
-                       outfd = ret;
-               } else {
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (ret == -EBADF) {
-                               WARN("Remote relayd disconnected. Stopping");
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       goto end;
+               if (ret < 0) {
+                       written = ret;
+                       relayd_hang_up = 1;
+                       goto write_error;
                }
+               /* Use the returned socket. */
+               outfd = ret;
        } else {
                /* No streaming, we have to set the len with the full padding */
                len += padding;
@@ -1755,6 +1763,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        stream->out_fd, &(stream->tracefile_count_current),
                                        &stream->out_fd);
                        if (ret < 0) {
+                               written = ret;
                                ERR("Rotating output file");
                                goto end;
                        }
@@ -1766,6 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                                stream->chan->tracefile_size,
                                                stream->tracefile_count_current);
                                if (ret < 0) {
+                                       written = ret;
                                        goto end;
                                }
                                stream->index_fd = ret;
@@ -1788,28 +1798,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                DBG("splice chan to pipe, ret %zd", ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
+                       written = -ret;
                        PERROR("Error in relay splice");
                        goto splice_error;
                }
 
                /* Handle stream on the relayd if the output is on the network */
-               if (relayd) {
-                       if (stream->metadata_flag) {
-                               size_t metadata_payload_size =
-                                       sizeof(struct lttcomm_relayd_metadata_payload);
+               if (relayd && stream->metadata_flag) {
+                       size_t metadata_payload_size =
+                               sizeof(struct lttcomm_relayd_metadata_payload);
 
-                               /* Update counter to fit the spliced data */
-                               ret_splice += metadata_payload_size;
-                               len += metadata_payload_size;
-                               /*
-                                * We do this so the return value can match the len passed as
-                                * argument to this function.
-                                */
-                               written -= metadata_payload_size;
-                       }
+                       /* Update counter to fit the spliced data */
+                       ret_splice += metadata_payload_size;
+                       len += metadata_payload_size;
+                       /*
+                        * We do this so the return value can match the len passed as
+                        * argument to this function.
+                        */
+                       written -= metadata_payload_size;
                }
 
                /* Splice data out */
@@ -1818,24 +1824,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                DBG("Consumer splice pipe to file, ret %zd", ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF || errno == EPIPE || errno == ESPIPE) {
-                               WARN("Remote relayd disconnected. Stopping");
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       PERROR("Error in file splice");
-                       goto splice_error;
+                       written = -ret;
+                       relayd_hang_up = 1;
+                       goto write_error;
                } else if (ret_splice > len) {
                        /*
                         * We don't expect this code path to be executed but you never know
                         * so this is an extra protection agains a buggy splice().
                         */
-                       written += ret_splice;
                        ret = errno;
+                       written += ret_splice;
                        PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
                                        len);
                        goto splice_error;
@@ -3615,6 +3613,7 @@ int consumer_send_status_msg(int sock, int ret_code)
 {
        struct lttcomm_consumer_status_msg msg;
 
+       memset(&msg, 0, sizeof(msg));
        msg.ret_code = ret_code;
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
@@ -3632,6 +3631,7 @@ int consumer_send_status_channel(int sock,
 
        assert(sock >= 0);
 
+       memset(&msg, 0, sizeof(msg));
        if (!channel) {
                msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
This page took 0.028554 seconds and 5 git commands to generate.