Refactor: lttng_ustctl_flush_buffer is a duplicate of lttng_ustconsumer_flush_buffer
[lttng-tools.git] / src / common / consumer / consumer.c
index 0cfbf5c7c03ef36bf1dc958601babd76e6f3d279..3d725528a3a8638b54db5d0196a92936c6a44ba6 100644 (file)
@@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head)
        (void) relayd_close(&relayd->control_sock);
        (void) relayd_close(&relayd->data_sock);
 
+       pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
        free(relayd);
 }
 
@@ -464,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
 
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
@@ -491,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -812,6 +810,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                                stream->trace_archive_id);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
 
@@ -853,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
                ret = relayd_streams_sent(&relayd->control_sock);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
        } else {
@@ -1720,7 +1722,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1946,7 +1949,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -2279,26 +2283,6 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_unlock();
 }
 
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
-               uint64_t key)
-{
-       ssize_t ret;
-
-       do {
-               ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
-       } while (ret == -1 && errno == EINTR);
-       if (ret == -1) {
-               PERROR("Failed to write to the channel rotation pipe");
-       } else {
-               DBG("Sent channel rotation notification for channel key %"
-                               PRIu64, key);
-               ret = 0;
-       }
-
-       return (int) ret;
-}
-
 /*
  * Perform operations that need to be done after a stream has
  * rotated and released the stream lock.
@@ -2335,13 +2319,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream,
                        abort();
        }
 
-       if (--stream->chan->nr_stream_rotate_pending == 0) {
-               DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
-                               stream->chan->name);
-               ret = rotate_notify_sessiond(ctx, stream->chan->key);
-       }
        pthread_mutex_unlock(&stream->chan->lock);
-
        return ret;
 }
 
@@ -3590,7 +3568,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
-               fd = -1;        /* For error path */
                /* Assign version values. */
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
@@ -3618,7 +3595,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
-               fd = -1;        /* for eventual error paths */
                /* Assign version values. */
                relayd->data_sock.major = relayd_sock->major;
                relayd->data_sock.minor = relayd_sock->minor;
@@ -3632,6 +3608,11 @@ error:
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
+       /*
+        * We gave the ownership of the fd to the relayd structure. Set the
+        * fd to -1 so we don't call close() on it in the error path below.
+        */
+       fd = -1;
 
        /* We successfully added the socket. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
@@ -3645,6 +3626,7 @@ error:
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
@@ -3668,34 +3650,6 @@ error_nosignal:
        }
 }
 
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
-       int ret;
-
-       assert(stream);
-
-       /*
-        * Try to lock the stream mutex. On failure, we know that the stream is
-        * being used else where hence there is data still being extracted.
-        */
-       ret = pthread_mutex_trylock(&stream->lock);
-       if (ret) {
-               /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
-               ret = 0;
-               goto end;
-       }
-
-       ret = 1;
-
-end:
-       return ret;
-}
-
 /*
  * Search for a relayd associated to the session id and return the reference.
  *
@@ -3762,28 +3716,11 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       relayd = find_relayd_by_session_id(id);
-       if (relayd) {
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_begin_data_pending(&relayd->control_sock,
-                               relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       /* Communication error thus the relayd so no data pending. */
-                       goto data_not_pending;
-               }
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
                        &iter.iter, stream, node_session_id.node) {
-               /* If this call fails, the stream is being used hence data pending. */
-               ret = stream_try_lock(stream);
-               if (!ret) {
-                       goto data_pending;
-               }
+               pthread_mutex_lock(&stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -3801,9 +3738,27 @@ int consumer_data_pending(uint64_t id)
                        }
                }
 
-               /* Relayd check */
-               if (relayd) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       /* Communication error thus the relayd so no data pending. */
+                       goto data_not_pending;
+               }
+
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                stream->relayd_stream_id);
@@ -3812,24 +3767,25 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
                        if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_pending;
+                       } else if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               goto data_not_pending;
                        }
                }
-               pthread_mutex_unlock(&stream->lock);
-       }
 
-       if (relayd) {
-               unsigned int is_data_inflight = 0;
-
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               /* Send end command for data pending. */
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
                if (is_data_inflight) {
@@ -3927,7 +3883,7 @@ int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_act
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustctl_flush_buffer(stream, producer_active);
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -3943,15 +3899,16 @@ end:
  * is already at the rotate position (produced == consumed), we flag it as
  * ready for rotation. The rotation of ready streams occurs after we have
  * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
-               uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+               uint64_t key, const char *path, uint64_t relayd_id,
+               uint32_t metadata, uint64_t new_chunk_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -3960,13 +3917,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
 
        rcu_read_lock();
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        pthread_mutex_lock(&channel->lock);
        channel->current_chunk_id = new_chunk_id;
 
@@ -4031,7 +3981,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
                if (consumed_pos == stream->rotate_position) {
                        stream->rotate_ready = true;
                }
-               channel->nr_stream_rotate_pending++;
 
                ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
@@ -4200,6 +4149,10 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
                        stream->chan->current_chunk_id,
                        stream->last_sequence_number);
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        if (ret) {
                ERR("Rotate relay stream");
        }
@@ -4226,8 +4179,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        } else {
                ret = rotate_local_stream(ctx, stream);
        }
+       stream->trace_archive_id++;
        if (ret < 0) {
-               ERR("Rotate stream");
+               ERR("Failed to rotate stream, ret = %i", ret);
                goto error;
        }
 
@@ -4275,14 +4229,15 @@ error:
  * This is especially important for low throughput streams that have already
  * been consumed, we cannot wait for their next packet to perform the
  * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_ready_streams(uint64_t key,
-               struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+               uint64_t key, struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -4291,13 +4246,6 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
@@ -4373,6 +4321,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path,
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+       if (ret < 0) {
+               ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 end:
        return ret;
@@ -4388,7 +4340,64 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
        }
 }
 
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+               uint64_t session_id, uint64_t chunk_id)
+{
+       bool pending = false;
+
+       if (stream->session_id != session_id) {
+               /* Skip. */
+               goto end;
+       }
+
+       /*
+        * If the stream's archive_id belongs to the chunk being rotated (or an
+        * even older one), it means that the consumer has not consumed all the
+        * buffers that belong to the chunk being rotated. Therefore, the
+        * rotation is considered as ongoing/pending.
+        */
+       pending = stream->trace_archive_id <= chunk_id;
+end:
+       return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+               uint64_t chunk_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       bool rotation_pending = false;
+
+       /* Start with the metadata streams... */
+       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+       /* ... followed by the data streams. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               pthread_mutex_lock(&stream->lock);
+               rotation_pending = check_stream_rotation_pending(stream,
+                               session_id, chunk_id);
+               pthread_mutex_unlock(&stream->lock);
+               if (rotation_pending) {
+                       goto end;
+               }
+       }
+
+end:
+       return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
                uint64_t relayd_id, uint64_t chunk_id)
 {
        int ret;
@@ -4396,13 +4405,17 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id,
 
        relayd = consumer_find_relayd(relayd_id);
        if (!relayd) {
-               ERR("Failed to find relayd");
+               ERR("Failed to find relayd id %" PRIu64, relayd_id);
                ret = -1;
                goto end;
        }
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+       if (ret < 0) {
+               ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
@@ -4440,6 +4453,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id)
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_mkdir(&relayd->control_sock, path);
+       if (ret < 0) {
+               ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
This page took 0.032594 seconds and 5 git commands to generate.