consumerd: Implement clear stream/session commands
[lttng-tools.git] / src / common / consumer / consumer.c
index 3e3c304873bad94c005cfa5a611c46ccc4ce716e..7b8782b2e12e2193ebb4f5a8e9062db10ba6ca57 100644 (file)
@@ -3821,34 +3821,167 @@ end:
 }
 
 static
-int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+int consumer_unlink_stream_files_rotation(struct lttng_consumer_stream *stream)
 {
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+       uint64_t tracefile_count = stream->chan->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
 
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                               tracefile_size, count, stream->uid, stream->gid, NULL);
+               if (ret < 0 && errno != ENOENT) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+               if (stream->index_file) {
+                       ret = lttng_index_file_unlink(stream->chan->pathname, stream->name,
+                                       stream->uid, stream->gid, tracefile_size, count);
+                       if (ret < 0 && errno != ENOENT) {
+                               return LTTCOMM_CONSUMERD_FATAL;
+                       }
+               }
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_unlink_stream_files(struct lttng_consumer_stream *stream)
+{
+       uint64_t tracefile_size = stream->chan->tracefile_size;
        int ret;
-       struct lttng_consumer_stream *stream;
 
-       assert(!channel->monitor);
-       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+       /* No tracefile rotation, a single file to unlink and re-create. */
+       ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0 && errno != ENOENT) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream_files(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+
+       /*
+        * If stream is sent over to a relay daemon, there are no local files
+        * to unlink.
+        */
+        if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               return LTTCOMM_CONSUMERD_SUCCESS;
+       }
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = -1;
+       stream->out_fd_offset = 0;
+       stream->tracefile_size_current = 0;
+
+       /*
+        * Re-creation of the index file takes care of clearing its
+        * content for non-tracefile-rotation streams.
+        * Rotation streams need to explicitly unlink each index file.
+        * We put the stream file, but keep the stream->index_file value
+        * as indication whether the stream has index (non-NULL) before
+        * overwriting it with an index creation.
+        */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+       }
+
+       if (tracefile_size > 0) {
+               /* Tracefile rotation. */
+               ret = consumer_unlink_stream_files_rotation(stream);
+       } else {
+               ret = consumer_unlink_stream_files(stream);
+       }
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               return ret;
+       }
+
+       /* Create new files. */
+       ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = ret;
+
+       if (stream->index_file) {
+               stream->index_file = lttng_index_file_create(stream->chan->pathname,
+                               stream->name, stream->uid, stream->gid, tracefile_size,
+                               0, CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!stream->index_file) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+       }
+
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = consumer_flush_buffer(stream, 1);
+       if (ret < 0) {
+               ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_buffer(stream);
+       if (ret < 0) {
+               ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_stream_files(stream);
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               ERR("Failed to clear stream %" PRIu64 " files during channel clear",
+                       stream->key);
+               goto error;
+       }
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                health_code_update();
                pthread_mutex_lock(&stream->lock);
-
-               ret = consumer_flush_buffer(stream, 1);
-               if (ret < 0) {
-                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
-                                       stream->key);
-                       ret = LTTCOMM_CONSUMERD_FATAL;
-                       goto error_unlock;
-               }
-
-               ret = consumer_clear_buffer(stream);
-               if (ret < 0) {
-                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
-                                       stream->key);
-                       ret = LTTCOMM_CONSUMERD_FATAL;
+               ret = consumer_clear_stream(stream);
+               if (ret) {
                        goto error_unlock;
                }
                pthread_mutex_unlock(&stream->lock);
@@ -3861,40 +3994,119 @@ error_unlock:
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&channel->lock);
        rcu_read_unlock();
+       if (ret)
+               goto error;
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
        return ret;
+
 }
 
-int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+static
+int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
 {
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
        int ret;
 
-       DBG("Consumer clear channel %" PRIu64, channel->key);
+       ht = consumer_data.stream_per_chan_id_ht;
 
-       if (!channel->monitor) {
-               /* Snapshot mode */
-               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
-                       /*
-                        * Nothing to do for the metadata channel/stream.
-                        * Snapshot mechanism already take care of the metadata
-                        * handling/generation.
-                        */
-                       goto end;
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               /*
+                * Protect against teardown with mutex.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
                }
-               ret = clear_unmonitored_channel(channel);
+               ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error;
+                       goto error_unlock;
                }
-       } else {
-               /* TODO:
-                * Normal channel and relayd bound clear operation not supported
-                * for now
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+       rcu_read_unlock();
+       return LTTCOMM_CONSUMERD_SUCCESS;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               /*
+                * Nothing to do for the metadata channel/stream.
+                * Snapshot mechanism already take care of the metadata
+                * handling/generation, and monitored channels only need to
+                * have their data stream cleared..
                 */
-               ret = LTTCOMM_CONSUMERD_FATAL;
-               goto error;
+               ret = LTTCOMM_CONSUMERD_SUCCESS;
+               goto end;
        }
 
+       if (!channel->monitor) {
+               ret = consumer_clear_unmonitored_channel(channel);
+       } else {
+               ret = consumer_clear_monitored_channel(channel);
+       }
 end:
+       return ret;
+}
+
+int lttng_consumer_clear_session(uint64_t session_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       int ret;
+
+       DBG("Consumer clear session %" PRIu64, session_id);
+
+       rcu_read_lock();
+
+       /* Find first stream match in data_ht. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               if (stream->chan->session_id == session_id) {
+                       struct consumer_relayd_sock_pair *relayd = NULL;
+
+                       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+                               /*
+                                * Asking for a clear session on local session.
+                                * No relayd to contact, nothing to do.
+                                */
+                               break;  /* Stop after first match. */
+                       }
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       if (relayd == NULL) {
+                               /*
+                                * relayd is shutting down.
+                                */
+                               ret = LTTCOMM_CONSUMERD_SUCCESS;
+                               goto end;
+                       }
+                       ret = relayd_clear_session(&relayd->control_sock);
+                       if (ret < 0) {
+                               ret = LTTCOMM_CONSUMERD_FATAL;
+                               goto end;
+                       }
+                       break;  /* Stop after first match. */
+
+               }
+       }
        ret = LTTCOMM_CONSUMERD_SUCCESS;
-error:
+end:
+       rcu_read_unlock();
        return ret;
 }
This page took 0.028848 seconds and 5 git commands to generate.