Consumer: Implement lttng_consumer_clear_channel
[lttng-tools.git] / src / common / consumer / consumer.c
index 6de72e2758b5086f35c36e96f69276eaa8dfbdf0..da81f2eedf43eef24251f75dc84185423509c749 100644 (file)
@@ -3568,7 +3568,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
-               fd = -1;        /* For error path */
                /* Assign version values. */
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
@@ -3596,7 +3595,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
-               fd = -1;        /* for eventual error paths */
                /* Assign version values. */
                relayd->data_sock.major = relayd_sock->major;
                relayd->data_sock.minor = relayd_sock->minor;
@@ -3610,6 +3608,11 @@ error:
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
+       /*
+        * We gave the ownership of the fd to the relayd structure. Set the
+        * fd to -1 so we don't call close() on it in the error path below.
+        */
+       fd = -1;
 
        /* We successfully added the socket. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
@@ -3880,7 +3883,7 @@ int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_act
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustctl_flush_buffer(stream, producer_active);
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -3891,20 +3894,46 @@ end:
        return ret;
 }
 
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
  * ready for rotation. The rotation of ready streams occurs after we have
  * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
-               uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
+int lttng_consumer_rotate_sample_channel(struct lttng_consumer_channel *channel,
+               uint64_t key, const char *path, uint64_t relayd_id,
+               uint32_t metadata, uint64_t new_chunk_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -3913,13 +3942,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
 
        rcu_read_lock();
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        pthread_mutex_lock(&channel->lock);
        channel->current_chunk_id = new_chunk_id;
 
@@ -3984,7 +4006,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
                if (consumed_pos == stream->rotate_position) {
                        stream->rotate_ready = true;
                }
-               channel->nr_stream_rotate_pending++;
 
                ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
@@ -4233,14 +4254,15 @@ error:
  * This is especially important for low throughput streams that have already
  * been consumed, we cannot wait for their next packet to perform the
  * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_ready_streams(uint64_t key,
-               struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+               uint64_t key, struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -4249,13 +4271,6 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
@@ -4483,3 +4498,82 @@ int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                return mkdir_local(path, uid, gid);
        }
 }
+
+static
+int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       assert(!channel->monitor);
+       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+
+               ret = consumer_clear_buffer(stream);
+               if (ret < 0) {
+                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (!channel->monitor) {
+               /* Snapshot mode */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /*
+                        * Nothing to do for the metadata channel/stream.
+                        * Snapshot mechanism already take care of the metadata
+                        * handling/generation.
+                        */
+                       goto end;
+               }
+               ret = clear_unmonitored_channel(channel);
+               if (ret) {
+                       goto error;
+               }
+       } else {
+               /* TODO:
+                * Normal channel and relayd bound clear operation not supported
+                * for now
+                */
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+end:
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
This page took 0.027267 seconds and 5 git commands to generate.