consumerd: move address computation from on_read_subbuffer_mmap
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index faf069c0df6f9319a53751a532cc5e88e769ca26..4e93faf64911fd4d90b1a33fdc7a9ddd43935294 100644 (file)
@@ -16,6 +16,7 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <lttng/ust-ctl.h>
@@ -553,7 +554,7 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
-       uint64_t net_seq_idx = -1ULL;
+       uint64_t relayd_id = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -578,8 +579,8 @@ static int send_sessiond_channel(int sock,
                                }
                                ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        }
-                       if (net_seq_idx == -1ULL) {
-                               net_seq_idx = stream->net_seq_idx;
+                       if (relayd_id == -1ULL) {
+                               relayd_id = stream->relayd_id;
                        }
                }
        }
@@ -768,10 +769,19 @@ static int flush_channel(uint64_t chan_key)
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
+
+               /*
+                * Protect against concurrent teardown of a stream.
+                */
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
                if (!stream->quiescent) {
                        ustctl_flush_buffer(stream->ustream, 0);
                        stream->quiescent = true;
                }
+next:
                pthread_mutex_unlock(&stream->lock);
        }
 error:
@@ -831,6 +841,7 @@ static int close_metadata(uint64_t chan_key)
 {
        int ret = 0;
        struct lttng_consumer_channel *channel;
+       unsigned int channel_monitor;
 
        DBG("UST consumer close metadata key %" PRIu64, chan_key);
 
@@ -849,13 +860,48 @@ static int close_metadata(uint64_t chan_key)
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
-
+       channel_monitor = channel->monitor;
        if (cds_lfht_is_node_deleted(&channel->node.node)) {
                goto error_unlock;
        }
 
        lttng_ustconsumer_close_metadata(channel);
+       pthread_mutex_unlock(&channel->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
+
+       /*
+        * The ownership of a metadata channel depends on the type of
+        * session to which it belongs. In effect, the monitor flag is checked
+        * to determine if this metadata channel is in "snapshot" mode or not.
+        *
+        * In the non-snapshot case, the metadata channel is created along with
+        * a single stream which will remain present until the metadata channel
+        * is destroyed (on the destruction of its session). In this case, the
+        * metadata stream in "monitored" by the metadata poll thread and holds
+        * the ownership of its channel.
+        *
+        * Closing the metadata will cause the metadata stream's "metadata poll
+        * pipe" to be closed. Closing this pipe will wake-up the metadata poll
+        * thread which will teardown the metadata stream which, in return,
+        * deletes the metadata channel.
+        *
+        * In the snapshot case, the metadata stream is created and destroyed
+        * on every snapshot record. Since the channel doesn't have an owner
+        * other than the session daemon, it is safe to destroy it immediately
+        * on reception of the CLOSE_METADATA command.
+        */
+       if (!channel_monitor) {
+               /*
+                * The channel and consumer_data locks must be
+                * released before this call since consumer_del_channel
+                * re-acquires the channel and consumer_data locks to teardown
+                * the channel and queue its reclamation by the "call_rcu"
+                * worker thread.
+                */
+               consumer_del_channel(channel);
+       }
 
+       return ret;
 error_unlock:
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
@@ -903,7 +949,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+       if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) {
                ret = consumer_send_relayd_stream(metadata->metadata_stream,
                                metadata->pathname);
                if (ret < 0) {
@@ -911,7 +957,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
                        goto error;
                }
                ret = consumer_send_relayd_streams_sent(
-                               metadata->metadata_stream->net_seq_idx);
+                               metadata->metadata_stream->relayd_id);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        goto error;
@@ -1003,7 +1049,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
        assert(metadata_stream);
 
        if (relayd_id != (uint64_t) -1ULL) {
-               metadata_stream->net_seq_idx = relayd_id;
+               metadata_stream->relayd_id = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
                        goto error_stream;
@@ -1043,6 +1089,35 @@ error:
        return ret;
 }
 
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+               const char **addr)
+{
+       int ret;
+       unsigned long mmap_offset;
+       const char *mmap_base;
+
+       mmap_base = ustctl_get_mmap_base(stream->ustream);
+       if (!mmap_base) {
+               ERR("Failed to get mmap base for stream `%s`",
+                               stream->name);
+               ret = -EPERM;
+               goto error;
+       }
+
+       ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
+       if (ret != 0) {
+               ERR("Failed to get mmap offset for stream `%s`", stream->name);
+               ret = -EINVAL;
+               goto error;
+       }
+
+       *addr = mmap_base + mmap_offset;
+error:
+       return ret;
+
+}
+
 /*
  * Take a snapshot of all the stream of a channel.
  *
@@ -1076,14 +1151,11 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               /* Are we at a position _before_ the first available packet ? */
-               bool before_first_packet = true;
-
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
                pthread_mutex_lock(&stream->lock);
-               stream->net_seq_idx = relayd_id;
+               stream->relayd_id = relayd_id;
 
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
@@ -1150,7 +1222,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
-                       int lost_packet = 0;
+                       const char *subbuf_addr;
 
                        health_code_update();
 
@@ -1164,15 +1236,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                }
                                DBG("UST consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
-
-                               /*
-                                * Start accounting lost packets only when we
-                                * already have extracted packets (to match the
-                                * content of the final snapshot).
-                                */
-                               if (!before_first_packet) {
-                                       lost_packet = 1;
-                               }
+                               stream->chan->lost_packets++;
                                continue;
                        }
 
@@ -1188,7 +1252,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                goto error_put_subbuf;
                        }
 
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+                       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+                       if (ret) {
+                               goto error_put_subbuf;
+                       }
+
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                                       stream, subbuf_addr, len,
                                        padded_len - len, NULL);
                        if (use_relayd) {
                                if (read_len != len) {
@@ -1208,16 +1278,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                goto error_close_stream;
                        }
                        consumed_pos += stream->max_sb_size;
-
-                       /*
-                        * Only account lost packets located between
-                        * succesfully extracted packets (do not account before
-                        * and after since they are not visible in the
-                        * resulting snapshot).
-                        */
-                       stream->chan->lost_packets += lost_packet;
-                       lost_packet = 0;
-                       before_first_packet = false;
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
@@ -1354,7 +1414,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
                /* Session daemon status message are handled in the following call. */
-               ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+               consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
                                &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
                                msg.u.relayd_sock.relayd_session_id);
@@ -1912,29 +1972,13 @@ error_fatal:
        return -1;
 }
 
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
-               unsigned long *off)
-{
-       assert(stream);
-       assert(stream->ustream);
-
-       return ustctl_get_mmap_read_offset(stream->ustream, off);
-}
-
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
+void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
+               int producer_active)
 {
        assert(stream);
        assert(stream->ustream);
 
-       return ustctl_get_mmap_base(stream->ustream);
+       ustctl_flush_buffer(stream->ustream, producer_active);
 }
 
 /*
@@ -2430,6 +2474,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
+       const char *subbuf_addr;
 
        assert(stream);
        assert(stream->ustream);
@@ -2496,6 +2541,8 @@ retry:
                index.offset = htobe64(stream->out_fd_offset);
                ret = get_index_values(&index, ustream);
                if (ret < 0) {
+                       err = ustctl_put_subbuf(ustream);
+                       assert(err == 0);
                        goto end;
                }
 
@@ -2503,6 +2550,8 @@ retry:
                ret = update_stream_stats(stream);
                if (ret < 0) {
                        PERROR("kernctl_get_events_discarded");
+                       err = ustctl_put_subbuf(ustream);
+                       assert(err == 0);
                        goto end;
                }
        } else {
@@ -2521,14 +2570,23 @@ retry:
        assert(len >= subbuf_size);
 
        padding = len - subbuf_size;
+
+       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+       if (ret) {
+               write_index = 0;
+               goto error_put_subbuf;
+       }
+
        /* write the subbuffer to the tracefile */
-       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
+       ret = lttng_consumer_on_read_subbuffer_mmap(
+                       ctx, stream, subbuf_addr, subbuf_size, padding, &index);
        /*
-        * The mmap operation should write subbuf_size amount of data when network
-        * streaming or the full padding (len) size when we are _not_ streaming.
+        * The mmap operation should write subbuf_size amount of data when
+        * network streaming or the full padding (len) size when we are _not_
+        * streaming.
         */
-       if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+       if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
+                       (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
                /*
                 * Display the error but continue processing to try to release the
                 * subbuffer. This is a DBG statement since any unexpected kill or
@@ -2542,6 +2600,7 @@ retry:
                                ret, len, subbuf_size);
                write_index = 0;
        }
+error_put_subbuf:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
@@ -2609,7 +2668,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        assert(stream);
 
        /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+       if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid, NULL);
@@ -2837,7 +2896,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        request.key = channel->key;
 
        DBG("Sending metadata request to sessiond, session id %" PRIu64
-                       ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64,
+                       ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
                        request.session_id, request.session_id_per_pid, request.uid,
                        request.key);
 
This page took 0.029265 seconds and 5 git commands to generate.