consumer: Rename net_seq_idx to relayd_id
[deliverable/lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index fe7445b7f133818353fb96b1df10ed0171db88bf..4d0b530a7ce798e5fe0295dbe6a1caf1a5ddabf0 100644 (file)
@@ -553,7 +553,7 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
-       uint64_t net_seq_idx = -1ULL;
+       uint64_t relayd_id = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -578,8 +578,8 @@ static int send_sessiond_channel(int sock,
                                }
                                ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        }
-                       if (net_seq_idx == -1ULL) {
-                               net_seq_idx = stream->net_seq_idx;
+                       if (relayd_id == -1ULL) {
+                               relayd_id = stream->relayd_id;
                        }
                }
        }
@@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
 
                health_code_update();
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+                       stream->quiescent = true;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+       int ret = 0;
+       struct lttng_consumer_channel *channel;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht *ht;
+       struct lttng_ht_iter iter;
+
+       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+       rcu_read_lock();
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       /* For each stream of the channel id, clear quiescent state. */
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+                       &channel->key, &iter.iter, stream, node_channel_id.node) {
+
+               health_code_update();
+
+               pthread_mutex_lock(&stream->lock);
+               stream->quiescent = false;
+               pthread_mutex_unlock(&stream->lock);
        }
 error:
        rcu_read_unlock();
@@ -784,6 +831,7 @@ static int close_metadata(uint64_t chan_key)
 {
        int ret = 0;
        struct lttng_consumer_channel *channel;
+       unsigned int channel_monitor;
 
        DBG("UST consumer close metadata key %" PRIu64, chan_key);
 
@@ -802,13 +850,48 @@ static int close_metadata(uint64_t chan_key)
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
-
+       channel_monitor = channel->monitor;
        if (cds_lfht_is_node_deleted(&channel->node.node)) {
                goto error_unlock;
        }
 
        lttng_ustconsumer_close_metadata(channel);
+       pthread_mutex_unlock(&channel->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
+       /*
+        * The ownership of a metadata channel depends on the type of
+        * session to which it belongs. In effect, the monitor flag is checked
+        * to determine if this metadata channel is in "snapshot" mode or not.
+        *
+        * In the non-snapshot case, the metadata channel is created along with
+        * a single stream which will remain present until the metadata channel
+        * is destroyed (on the destruction of its session). In this case, the
+        * metadata stream in "monitored" by the metadata poll thread and holds
+        * the ownership of its channel.
+        *
+        * Closing the metadata will cause the metadata stream's "metadata poll
+        * pipe" to be closed. Closing this pipe will wake-up the metadata poll
+        * thread which will teardown the metadata stream which, in return,
+        * deletes the metadata channel.
+        *
+        * In the snapshot case, the metadata stream is created and destroyed
+        * on every snapshot record. Since the channel doesn't have an owner
+        * other than the session daemon, it is safe to destroy it immediately
+        * on reception of the CLOSE_METADATA command.
+        */
+       if (!channel_monitor) {
+               /*
+                * The channel and consumer_data locks must be
+                * released before this call since consumer_del_channel
+                * re-acquires the channel and consumer_data locks to teardown
+                * the channel and queue its reclamation by the "call_rcu"
+                * worker thread.
+                */
+               consumer_del_channel(channel);
+       }
+
+       return ret;
 error_unlock:
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
@@ -856,7 +939,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+       if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) {
                ret = consumer_send_relayd_stream(metadata->metadata_stream,
                                metadata->pathname);
                if (ret < 0) {
@@ -864,7 +947,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
                        goto error;
                }
                ret = consumer_send_relayd_streams_sent(
-                               metadata->metadata_stream->net_seq_idx);
+                               metadata->metadata_stream->relayd_id);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        goto error;
@@ -956,7 +1039,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
        assert(metadata_stream);
 
        if (relayd_id != (uint64_t) -1ULL) {
-               metadata_stream->net_seq_idx = relayd_id;
+               metadata_stream->relayd_id = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
                        goto error_stream;
@@ -1029,14 +1112,11 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               /* Are we at a position _before_ the first available packet ? */
-               bool before_first_packet = true;
-
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
                pthread_mutex_lock(&stream->lock);
-               stream->net_seq_idx = relayd_id;
+               stream->relayd_id = relayd_id;
 
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
@@ -1064,7 +1144,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        }
                }
 
-               ustctl_flush_buffer(stream->ustream, 1);
+               /*
+                * If tracing is active, we want to perform a "full" buffer flush.
+                * Else, if quiescent, it has already been done by the prior stop.
+                */
+               if (!stream->quiescent) {
+                       ustctl_flush_buffer(stream->ustream, 0);
+               }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
@@ -1097,7 +1183,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
-                       int lost_packet = 0;
 
                        health_code_update();
 
@@ -1111,15 +1196,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                }
                                DBG("UST consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
-
-                               /*
-                                * Start accounting lost packets only when we
-                                * already have extracted packets (to match the
-                                * content of the final snapshot).
-                                */
-                               if (!before_first_packet) {
-                                       lost_packet = 1;
-                               }
+                               stream->chan->lost_packets++;
                                continue;
                        }
 
@@ -1155,16 +1232,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                goto error_close_stream;
                        }
                        consumed_pos += stream->max_sb_size;
-
-                       /*
-                        * Only account lost packets located between
-                        * succesfully extracted packets (do not account before
-                        * and after since they are not visible in the
-                        * resulting snapshot).
-                        */
-                       stream->chan->lost_packets += lost_packet;
-                       lost_packet = 0;
-                       before_first_packet = false;
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
@@ -1301,7 +1368,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
                /* Session daemon status message are handled in the following call. */
-               ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+               consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
                                &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
                                msg.u.relayd_sock.relayd_session_id);
@@ -1582,6 +1649,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+       {
+               int ret;
+
+               ret = clear_quiescent_channel(
+                               msg.u.clear_quiescent_channel.key);
+               if (ret != 0) {
+                       ret_code = ret;
+               }
+
+               goto end_msg_sessiond;
+       }
        case LTTNG_CONSUMER_PUSH_METADATA:
        {
                int ret;
@@ -1692,7 +1771,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DISCARDED_EVENTS:
        {
-               uint64_t ret;
+               int ret = 0;
+               uint64_t discarded_events;
                struct lttng_ht_iter iter;
                struct lttng_ht *ht;
                struct lttng_consumer_stream *stream;
@@ -1713,13 +1793,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 * found (no events are dropped if the channel is not yet in
                 * use).
                 */
-               ret = 0;
+               discarded_events = 0;
                cds_lfht_for_each_entry_duplicate(ht->ht,
                                ht->hash_fct(&id, lttng_ht_seed),
                                ht->match_fct, &id,
                                &iter.iter, stream, node_session_id.node) {
                        if (stream->chan->key == key) {
-                               ret = stream->chan->discarded_events;
+                               discarded_events = stream->chan->discarded_events;
                                break;
                        }
                }
@@ -1732,7 +1812,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                /* Send back returned value to session daemon */
-               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
                if (ret < 0) {
                        PERROR("send discarded events");
                        goto error_fatal;
@@ -1944,14 +2024,19 @@ int lttng_ustconsumer_get_sequence_number(
 }
 
 /*
- * Called when the stream signal the consumer that it has hang up.
+ * Called when the stream signals the consumer that it has hung up.
  */
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        assert(stream);
        assert(stream->ustream);
 
-       ustctl_flush_buffer(stream->ustream, 0);
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->quiescent) {
+               ustctl_flush_buffer(stream->ustream, 0);
+               stream->quiescent = true;
+       }
+       pthread_mutex_unlock(&stream->lock);
        stream->hangup_flush_done = 1;
 }
 
@@ -1985,11 +2070,6 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
                        }
                }
        }
-       /* Try to rmdir all directories under shm_path root. */
-       if (chan->root_shm_path[0]) {
-               (void) run_as_recursive_rmdir(chan->root_shm_path,
-                               chan->uid, chan->gid);
-       }
 }
 
 void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
@@ -1999,6 +2079,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
 
        consumer_metadata_cache_destroy(chan);
        ustctl_destroy_channel(chan->uchan);
+       /* Try to rmdir all directories under shm_path root. */
+       if (chan->root_shm_path[0]) {
+               (void) run_as_recursive_rmdir(chan->root_shm_path,
+                               chan->uid, chan->gid);
+       }
        free(chan->stream_fds);
 }
 
@@ -2202,10 +2287,10 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
         * because we locked the metadata thread.
         */
        ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+       pthread_mutex_lock(&metadata->lock);
        if (ret < 0) {
                goto end;
        }
-       pthread_mutex_lock(&metadata->lock);
 
        ret = commit_one_metadata_packet(metadata);
        if (ret <= 0) {
@@ -2327,8 +2412,8 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
        }
        if (discarded < stream->last_discarded_events) {
                /*
-                * Overflow has occured. We assume only one wrap-around
-                * has occured.
+                * Overflow has occurred. We assume only one wrap-around
+                * has occurred.
                 */
                stream->chan->discarded_events +=
                                (1ULL << (CAA_BITS_PER_LONG - 1)) -
@@ -2425,6 +2510,8 @@ retry:
                index.offset = htobe64(stream->out_fd_offset);
                ret = get_index_values(&index, ustream);
                if (ret < 0) {
+                       err = ustctl_put_subbuf(ustream);
+                       assert(err == 0);
                        goto end;
                }
 
@@ -2432,6 +2519,8 @@ retry:
                ret = update_stream_stats(stream);
                if (ret < 0) {
                        PERROR("kernctl_get_events_discarded");
+                       err = ustctl_put_subbuf(ustream);
+                       assert(err == 0);
                        goto end;
                }
        } else {
@@ -2456,8 +2545,8 @@ retry:
         * The mmap operation should write subbuf_size amount of data when network
         * streaming or the full padding (len) size when we are _not_ streaming.
         */
-       if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+       if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
+                       (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
                /*
                 * Display the error but continue processing to try to release the
                 * subbuffer. This is a DBG statement since any unexpected kill or
@@ -2538,7 +2627,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        assert(stream);
 
        /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+       if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid, NULL);
@@ -2549,14 +2638,17 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       stream->index_file = index_file;
                }
        }
        ret = 0;
@@ -2763,7 +2855,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        request.key = channel->key;
 
        DBG("Sending metadata request to sessiond, session id %" PRIu64
-                       ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64,
+                       ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
                        request.session_id, request.session_id_per_pid, request.uid,
                        request.key);
 
This page took 0.031521 seconds and 5 git commands to generate.