consumerd: Implement clear stream/session commands
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 3 Apr 2019 20:32:21 +0000 (16:32 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 10 Apr 2019 23:36:37 +0000 (19:36 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 3e3c304873bad94c005cfa5a611c46ccc4ce716e..7b8782b2e12e2193ebb4f5a8e9062db10ba6ca57 100644 (file)
@@ -3821,34 +3821,167 @@ end:
 }
 
 static
-int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+int consumer_unlink_stream_files_rotation(struct lttng_consumer_stream *stream)
 {
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+       uint64_t tracefile_count = stream->chan->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
 
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                               tracefile_size, count, stream->uid, stream->gid, NULL);
+               if (ret < 0 && errno != ENOENT) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+               if (stream->index_file) {
+                       ret = lttng_index_file_unlink(stream->chan->pathname, stream->name,
+                                       stream->uid, stream->gid, tracefile_size, count);
+                       if (ret < 0 && errno != ENOENT) {
+                               return LTTCOMM_CONSUMERD_FATAL;
+                       }
+               }
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_unlink_stream_files(struct lttng_consumer_stream *stream)
+{
+       uint64_t tracefile_size = stream->chan->tracefile_size;
        int ret;
-       struct lttng_consumer_stream *stream;
 
-       assert(!channel->monitor);
-       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+       /* No tracefile rotation, a single file to unlink and re-create. */
+       ret = utils_unlink_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0 && errno != ENOENT) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream_files(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t tracefile_size = stream->chan->tracefile_size;
+
+       /*
+        * If stream is sent over to a relay daemon, there are no local files
+        * to unlink.
+        */
+        if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               return LTTCOMM_CONSUMERD_SUCCESS;
+       }
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = -1;
+       stream->out_fd_offset = 0;
+       stream->tracefile_size_current = 0;
+
+       /*
+        * Re-creation of the index file takes care of clearing its
+        * content for non-tracefile-rotation streams.
+        * Rotation streams need to explicitly unlink each index file.
+        * We put the stream file, but keep the stream->index_file value
+        * as indication whether the stream has index (non-NULL) before
+        * overwriting it with an index creation.
+        */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+       }
+
+       if (tracefile_size > 0) {
+               /* Tracefile rotation. */
+               ret = consumer_unlink_stream_files_rotation(stream);
+       } else {
+               ret = consumer_unlink_stream_files(stream);
+       }
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               return ret;
+       }
+
+       /* Create new files. */
+       ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                       tracefile_size, 0, stream->uid, stream->gid, 0);
+       if (ret < 0) {
+               return LTTCOMM_CONSUMERD_FATAL;
+       }
+       stream->out_fd = ret;
+
+       if (stream->index_file) {
+               stream->index_file = lttng_index_file_create(stream->chan->pathname,
+                               stream->name, stream->uid, stream->gid, tracefile_size,
+                               0, CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+               if (!stream->index_file) {
+                       return LTTCOMM_CONSUMERD_FATAL;
+               }
+       }
+
+       return LTTCOMM_CONSUMERD_SUCCESS;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       ret = consumer_flush_buffer(stream, 1);
+       if (ret < 0) {
+               ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_buffer(stream);
+       if (ret < 0) {
+               ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                               stream->key);
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+       ret = consumer_clear_stream_files(stream);
+       if (ret != LTTCOMM_CONSUMERD_SUCCESS) {
+               ERR("Failed to clear stream %" PRIu64 " files during channel clear",
+                       stream->key);
+               goto error;
+       }
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                health_code_update();
                pthread_mutex_lock(&stream->lock);
-
-               ret = consumer_flush_buffer(stream, 1);
-               if (ret < 0) {
-                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
-                                       stream->key);
-                       ret = LTTCOMM_CONSUMERD_FATAL;
-                       goto error_unlock;
-               }
-
-               ret = consumer_clear_buffer(stream);
-               if (ret < 0) {
-                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
-                                       stream->key);
-                       ret = LTTCOMM_CONSUMERD_FATAL;
+               ret = consumer_clear_stream(stream);
+               if (ret) {
                        goto error_unlock;
                }
                pthread_mutex_unlock(&stream->lock);
@@ -3861,40 +3994,119 @@ error_unlock:
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&channel->lock);
        rcu_read_unlock();
+       if (ret)
+               goto error;
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
        return ret;
+
 }
 
-int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+static
+int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
 {
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
        int ret;
 
-       DBG("Consumer clear channel %" PRIu64, channel->key);
+       ht = consumer_data.stream_per_chan_id_ht;
 
-       if (!channel->monitor) {
-               /* Snapshot mode */
-               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
-                       /*
-                        * Nothing to do for the metadata channel/stream.
-                        * Snapshot mechanism already take care of the metadata
-                        * handling/generation.
-                        */
-                       goto end;
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               /*
+                * Protect against teardown with mutex.
+                */
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
                }
-               ret = clear_unmonitored_channel(channel);
+               ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error;
+                       goto error_unlock;
                }
-       } else {
-               /* TODO:
-                * Normal channel and relayd bound clear operation not supported
-                * for now
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+       rcu_read_unlock();
+       return LTTCOMM_CONSUMERD_SUCCESS;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               /*
+                * Nothing to do for the metadata channel/stream.
+                * Snapshot mechanism already take care of the metadata
+                * handling/generation, and monitored channels only need to
+                * have their data stream cleared..
                 */
-               ret = LTTCOMM_CONSUMERD_FATAL;
-               goto error;
+               ret = LTTCOMM_CONSUMERD_SUCCESS;
+               goto end;
        }
 
+       if (!channel->monitor) {
+               ret = consumer_clear_unmonitored_channel(channel);
+       } else {
+               ret = consumer_clear_monitored_channel(channel);
+       }
 end:
+       return ret;
+}
+
+int lttng_consumer_clear_session(uint64_t session_id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       int ret;
+
+       DBG("Consumer clear session %" PRIu64, session_id);
+
+       rcu_read_lock();
+
+       /* Find first stream match in data_ht. */
+       cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+               if (stream->chan->session_id == session_id) {
+                       struct consumer_relayd_sock_pair *relayd = NULL;
+
+                       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+                               /*
+                                * Asking for a clear session on local session.
+                                * No relayd to contact, nothing to do.
+                                */
+                               break;  /* Stop after first match. */
+                       }
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       if (relayd == NULL) {
+                               /*
+                                * relayd is shutting down.
+                                */
+                               ret = LTTCOMM_CONSUMERD_SUCCESS;
+                               goto end;
+                       }
+                       ret = relayd_clear_session(&relayd->control_sock);
+                       if (ret < 0) {
+                               ret = LTTCOMM_CONSUMERD_FATAL;
+                               goto end;
+                       }
+                       break;  /* Stop after first match. */
+
+               }
+       }
        ret = LTTCOMM_CONSUMERD_SUCCESS;
-error:
+end:
+       rcu_read_unlock();
        return ret;
 }
index 3d4fb85ef527bc7d18417738bb4208c21d5f876f..907aecf225e71ffde1e4f592e6b52091b8c93f15 100644 (file)
@@ -63,6 +63,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
        LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
        LTTNG_CONSUMER_CLEAR_CHANNEL,
+       LTTNG_CONSUMER_CLEAR_SESSION,
 };
 
 /* State of each fd in consumer */
@@ -757,5 +758,6 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
+int lttng_consumer_clear_session(uint64_t session_id);
 
 #endif /* LIB_CONSUMER_H */
index 7039b3b80cfc833ff830eb84c8ef70cc03b5c0bb..442c49631f9870064bf1f4ec6c0ca542e57329f0 100644 (file)
@@ -1116,6 +1116,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_CLEAR_SESSION:
+       {
+               uint64_t session_id = msg.u.clear_session.session_id;
+
+               ret = lttng_consumer_clear_session(session_id);
+               if (ret) {
+                       ERR("Clear session failed session_id %" PRIu64, session_id);
+                       ret_code = ret;
+               }
+
+               health_code_update();
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
        default:
                goto end_nosignal;
        }
index b936a4d3e7798e35e90983b6e20692d2b291bfc8..9f0cf1e3e56e74038ecc4c61f99e1e9cdbed7431 100644 (file)
@@ -941,3 +941,45 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
 error:
        return ret;
 }
+
+/*
+ * Ask the relay to clear files associated with the socket session.
+ */
+int relayd_clear_session(struct lttcomm_relayd_sock *rsock)
+{
+       int ret;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       DBG("Relayd clear session");
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS, NULL, 0, 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Receive response */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd clear session replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+       }
+
+       DBG("Relayd clear session successful");
+
+error:
+       return ret;
+}
index f090a0db63681b3b83ec451113039b3d40dd0b75..2ece137a879ee7129eb5f9caab755666cb2306d3 100644 (file)
@@ -51,5 +51,6 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                uint64_t net_seq_num);
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
+int relayd_clear_session(struct lttcomm_relayd_sock *rsock);
 
 #endif /* _RELAYD_H */
index 59c4aad71da97d9ff2e1d9250740460be3d94205..efca5cb9cf632e1bcfeb062aa68aac16e938d557 100644 (file)
@@ -544,6 +544,9 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t key;
                } LTTNG_PACKED clear_channel;
+               struct {
+                       uint64_t session_id;
+               } LTTNG_PACKED clear_session;
        } u;
 } LTTNG_PACKED;
 
index 473c94b2313a3ee23ffd71b80a1e880827c9834c..87c121464e332656eafb27813c900acbdf8f3a26 100644 (file)
@@ -1948,6 +1948,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                break;
        }
+       case LTTNG_CONSUMER_CLEAR_SESSION:
+       {
+               uint64_t session_id = msg.u.clear_session.session_id;
+
+               ret = lttng_consumer_clear_session(session_id);
+               if (ret) {
+                       ERR("Clear session failed session_id %" PRIu64, session_id);
+                       ret_code = ret;
+               }
+
+               health_code_update();
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
+
        default:
                break;
        }
This page took 0.033526 seconds and 5 git commands to generate.