consumerd: pass channel instance to stream creation function
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 4277372afdd2e874e5fc643568eb0720f6b51018..c37be8fb8fa49f103d04125861a6465bb7667575 100644 (file)
@@ -16,6 +16,7 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <lttng/ust-ctl.h>
@@ -154,7 +155,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
        assert(channel);
        assert(ctx);
 
-       stream = consumer_allocate_stream(channel->key,
+       stream = consumer_allocate_stream(
+                       channel,
+                       channel->key,
                        key,
                        LTTNG_CONSUMER_ACTIVE_STREAM,
                        channel->name,
@@ -184,8 +187,6 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                goto error;
        }
 
-       stream->chan = channel;
-
 error:
        if (_alloc_ret) {
                *_alloc_ret = alloc_ret;
@@ -553,7 +554,7 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
-       uint64_t net_seq_idx = -1ULL;
+       uint64_t relayd_id = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -578,8 +579,8 @@ static int send_sessiond_channel(int sock,
                                }
                                ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        }
-                       if (net_seq_idx == -1ULL) {
-                               net_seq_idx = stream->net_seq_idx;
+                       if (relayd_id == -1ULL) {
+                               relayd_id = stream->relayd_id;
                        }
                }
        }
@@ -768,10 +769,19 @@ static int flush_channel(uint64_t chan_key)
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
+
+               /*
+                * Protect against concurrent teardown of a stream.
+                */
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
                if (!stream->quiescent) {
                        ustctl_flush_buffer(stream->ustream, 0);
                        stream->quiescent = true;
                }
+next:
                pthread_mutex_unlock(&stream->lock);
        }
 error:
@@ -939,7 +949,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
+       if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) {
                ret = consumer_send_relayd_stream(metadata->metadata_stream,
                                metadata->pathname);
                if (ret < 0) {
@@ -947,7 +957,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
                        goto error;
                }
                ret = consumer_send_relayd_streams_sent(
-                               metadata->metadata_stream->net_seq_idx);
+                               metadata->metadata_stream->relayd_id);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        goto error;
@@ -1039,7 +1049,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
        assert(metadata_stream);
 
        if (relayd_id != (uint64_t) -1ULL) {
-               metadata_stream->net_seq_idx = relayd_id;
+               metadata_stream->relayd_id = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
                        goto error_stream;
@@ -1079,6 +1089,35 @@ error:
        return ret;
 }
 
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+               const char **addr)
+{
+       int ret;
+       unsigned long mmap_offset;
+       const char *mmap_base;
+
+       mmap_base = ustctl_get_mmap_base(stream->ustream);
+       if (!mmap_base) {
+               ERR("Failed to get mmap base for stream `%s`",
+                               stream->name);
+               ret = -EPERM;
+               goto error;
+       }
+
+       ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
+       if (ret != 0) {
+               ERR("Failed to get mmap offset for stream `%s`", stream->name);
+               ret = -EINVAL;
+               goto error;
+       }
+
+       *addr = mmap_base + mmap_offset;
+error:
+       return ret;
+
+}
+
 /*
  * Take a snapshot of all the stream of a channel.
  *
@@ -1116,7 +1155,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
 
                /* Lock stream because we are about to change its state. */
                pthread_mutex_lock(&stream->lock);
-               stream->net_seq_idx = relayd_id;
+               stream->relayd_id = relayd_id;
 
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
@@ -1183,6 +1222,8 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
+                       const char *subbuf_addr;
+                       struct lttng_buffer_view subbuf_view;
 
                        health_code_update();
 
@@ -1212,8 +1253,16 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                                goto error_put_subbuf;
                        }
 
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
-                                       padded_len - len, NULL);
+                       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+                       if (ret) {
+                               goto error_put_subbuf;
+                       }
+
+                       subbuf_view = lttng_buffer_view_init(
+                                       subbuf_addr, 0, padded_len);
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                                       stream, &subbuf_view, padded_len - len,
+                                       NULL);
                        if (use_relayd) {
                                if (read_len != len) {
                                        ret = -EPERM;
@@ -1926,29 +1975,13 @@ error_fatal:
        return -1;
 }
 
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
-               unsigned long *off)
-{
-       assert(stream);
-       assert(stream->ustream);
-
-       return ustctl_get_mmap_read_offset(stream->ustream, off);
-}
-
-/*
- * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
- * compiled out, we isolate it in this library.
- */
-void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
+void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
+               int producer_active)
 {
        assert(stream);
        assert(stream->ustream);
 
-       return ustctl_get_mmap_base(stream->ustream);
+       ustctl_flush_buffer(stream->ustream, producer_active);
 }
 
 /*
@@ -2444,6 +2477,8 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
+       const char *subbuf_addr;
+       struct lttng_buffer_view subbuf_view;
 
        assert(stream);
        assert(stream->ustream);
@@ -2539,14 +2574,25 @@ retry:
        assert(len >= subbuf_size);
 
        padding = len - subbuf_size;
+
+       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+       if (ret) {
+               write_index = 0;
+               goto error_put_subbuf;
+       }
+
+       subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
        /* write the subbuffer to the tracefile */
-       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
+       ret = lttng_consumer_on_read_subbuffer_mmap(
+                       ctx, stream, &subbuf_view, padding, &index);
        /*
-        * The mmap operation should write subbuf_size amount of data when network
-        * streaming or the full padding (len) size when we are _not_ streaming.
+        * The mmap operation should write subbuf_size amount of data when
+        * network streaming or the full padding (len) size when we are _not_
+        * streaming.
         */
-       if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+       if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
+                       (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
                /*
                 * Display the error but continue processing to try to release the
                 * subbuffer. This is a DBG statement since any unexpected kill or
@@ -2560,6 +2606,7 @@ retry:
                                ret, len, subbuf_size);
                write_index = 0;
        }
+error_put_subbuf:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
@@ -2627,7 +2674,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        assert(stream);
 
        /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+       if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid, NULL);
This page took 0.027987 seconds and 5 git commands to generate.