Propagate error for clear command
[lttng-tools.git] / src / common / consumer / consumer.c
index 0c3dc35c2b3bb15b70e2018c85094f96d45487d1..d46fc30a9f0817c48032e9ad69e5e07ec2f01843 100644 (file)
@@ -3616,21 +3616,6 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       relayd = find_relayd_by_session_id(id);
-       if (relayd) {
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_begin_data_pending(&relayd->control_sock,
-                               relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       /* Communication error thus the relayd so no data pending. */
-                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-                       lttng_consumer_cleanup_relayd(relayd);
-                       goto data_not_pending;
-               }
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&id, lttng_ht_seed),
                        ht->match_fct, &id,
@@ -3653,9 +3638,27 @@ int consumer_data_pending(uint64_t id)
                        }
                }
 
-               /* Relayd check */
-               if (relayd) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               if (ret < 0) {
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       /* Communication error thus the relayd so no data pending. */
+                       goto data_not_pending;
+               }
+
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                stream->relayd_stream_id);
@@ -3664,27 +3667,19 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
-                       if (ret < 0) {
+
+                       if (ret == 1) {
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               goto data_pending;
+                       } else if (ret < 0) {
                                ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
                                lttng_consumer_cleanup_relayd(relayd);
                                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                               pthread_mutex_unlock(&stream->lock);
                                goto data_not_pending;
                        }
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
-                       }
                }
-               pthread_mutex_unlock(&stream->lock);
-       }
-
-       if (relayd) {
-               unsigned int is_data_inflight = 0;
 
-               /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               /* Send end command for data pending. */
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -3772,3 +3767,346 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        }
        return start_pos;
 }
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_flush(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+static
+int consumer_unlink_stream_files_rotation(struct lttng_consumer_stream *stream)
+{
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+       uint64_t tracefile_count = stream->chan->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
+
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                               tracefile_size, count, stream->uid, stream->gid, NULL);
+               if (ret < 0 && errno != ENOENT) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+               if (stream->index_file) {
+                       ret = lttng_index_file_unlink(stream->chan->pathname, stream->name,
+                                       stream->uid, stream->gid, tracefile_size, count);
+                       if (ret < 0 && errno != ENOENT) {
+                               return LTTCOMM_CONSUMERD_FATAL;
+                       }
+               }
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_unlink_stream_files(struct lttng_consumer_stream *stream)
+{
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+       int ret;
+
+       /* No tracefile rotation, a single file to unlink and re-create. */
+       ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0 && errno != ENOENT) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream_files(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+
+       /*
+        * If stream is sent over to a relay daemon, there are no local files
+        * to unlink.
+        */
+        if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               return LTTCOMM_CONSUMERD_SUCCESS;
+       }
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = -1;
+       stream->out_fd_offset = 0;
+       stream->tracefile_size_current = 0;
+
+       /*
+        * Re-creation of the index file takes care of clearing its
+        * content for non-tracefile-rotation streams.
+        * Rotation streams need to explicitly unlink each index file.
+        * We put the stream file, but keep the stream->index_file value
+        * as indication whether the stream has index (non-NULL) before
+        * overwriting it with an index creation.
+        */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+       }
+
+       if (tracefile_size > 0) {
+               /* Tracefile rotation. */
+               ret = consumer_unlink_stream_files_rotation(stream);
+       } else {
+               ret = consumer_unlink_stream_files(stream);
+       }
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               return ret;
+       }
+
+       /* Create new files. */
+       ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = ret;
+
+       if (stream->index_file) {
+               stream->index_file = lttng_index_file_create(stream->chan->pathname,
+                               stream->name, stream->uid, stream->gid, tracefile_size,
+                               0, CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!stream->index_file) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+       }
+
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = consumer_flush_buffer(stream, 1);
+       if (ret < 0) {
+               ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_buffer(stream);
+       if (ret < 0) {
+               ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_stream_files(stream);
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               ERR("Failed to clear stream %" PRIu64 " files during channel clear",
+                       stream->key);
+               goto error;
+       }
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+               ret = consumer_clear_stream(stream);
+               if (ret) {
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       if (ret)
+               goto error;
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+
+}
+
+static
+int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
+{
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+       int ret;
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               /*
+                * Protect against teardown with mutex.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+               ret = consumer_clear_stream(stream);
+               if (ret) {
+                       goto error_unlock;
+               }
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+       rcu_read_unlock();
+       return LTTCOMM_CONSUMERD_SUCCESS;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               /*
+                * Nothing to do for the metadata channel/stream.
+                * Snapshot mechanism already take care of the metadata
+                * handling/generation, and monitored channels only need to
+                * have their data stream cleared..
+                */
+               ret = LTTCOMM_CONSUMERD_SUCCESS;
+               goto end;
+       }
+
+       if (!channel->monitor) {
+               ret = consumer_clear_unmonitored_channel(channel);
+       } else {
+               ret = consumer_clear_monitored_channel(channel);
+       }
+end:
+       return ret;
+}
+
+int lttng_consumer_clear_session(uint64_t session_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       int ret;
+
+       DBG("Consumer clear session %" PRIu64, session_id);
+
+       rcu_read_lock();
+
+       /* Find first stream match in data_ht. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               if (stream->chan->session_id == session_id) {
+                       struct consumer_relayd_sock_pair *relayd = NULL;
+
+                       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+                               /*
+                                * Asking for a clear session on local session.
+                                * No relayd to contact, nothing to do.
+                                */
+                               break;  /* Stop after first match. */
+                       }
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       if (relayd == NULL) {
+                               /*
+                                * relayd is shutting down.
+                                */
+                               ret = LTTCOMM_CONSUMERD_SUCCESS;
+                               goto end;
+                       }
+                       ret = relayd_clear_session(&relayd->control_sock);
+                       if (ret < 0) {
+                               ret = -ret;
+                               goto end;
+                       }
+                       break;  /* Stop after first match. */
+
+               }
+       }
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+end:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.02952 seconds and 5 git commands to generate.