consumerd: pass channel instance to stream creation function
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index e874103664d20fec950be3e82288c9996eb5fe7f..29b75d68d00d12d96cb62cfaf9ad3477d3f1314c 100644 (file)
@@ -16,6 +16,8 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include "common/buffer-view.h"
+#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
@@ -109,6 +111,25 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+static
+int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
+               const char **addr)
+{
+       int ret;
+       unsigned long mmap_offset;
+       const char *mmap_base = stream->mmap_base;
+
+       ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+       if (ret < 0) {
+               PERROR("Failed to get mmap read offset");
+               goto error;
+       }
+
+       *addr = mmap_base + mmap_offset;
+error:
+       return ret;
+}
+
 /*
  * Take a snapshot of all the stream of a channel
  *
@@ -154,7 +175,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                 * Assign the received relayd ID so we can use it for streaming. The streams
                 * are not visible to anyone so this is OK to change it.
                 */
-               stream->net_seq_idx = relayd_id;
+               stream->relayd_id = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
                        ret = consumer_send_relayd_stream(stream, path);
@@ -238,9 +259,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
+                       const char *subbuf_addr;
+                       struct lttng_buffer_view subbuf_view;
 
                        health_code_update();
-
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -267,7 +289,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                goto error_put_subbuf;
                        }
 
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
+                       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+                       if (ret) {
+                               goto error_put_subbuf;
+                       }
+
+                       subbuf_view = lttng_buffer_view_init(
+                                       subbuf_addr, 0, padded_len);
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                                       stream, &subbuf_view,
                                        padded_len - len, NULL);
                        /*
                         * We write the padded len in local tracefiles but the data len
@@ -305,7 +335,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        }
                } else {
                        close_relayd_stream(stream);
-                       stream->net_seq_idx = (uint64_t) -1ULL;
+                       stream->relayd_id = (uint64_t) -1ULL;
                }
                pthread_mutex_unlock(&stream->lock);
        }
@@ -396,7 +426,7 @@ static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        if (use_relayd) {
                close_relayd_stream(metadata_stream);
-               metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+               metadata_stream->relayd_id = (uint64_t) -1ULL;
        } else {
                if (metadata_stream->out_fd >= 0) {
                        ret = close(metadata_stream->out_fd);
@@ -620,7 +650,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
-               new_stream = consumer_allocate_stream(channel->key,
+               pthread_mutex_lock(&channel->lock);
+               new_stream = consumer_allocate_stream(
+                               channel,
+                               channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
                                channel->name,
@@ -640,10 +673,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                                break;
                        }
+                       pthread_mutex_unlock(&channel->lock);
                        goto end_nosignal;
                }
 
-               new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
                case CONSUMER_CHANNEL_SPLICE:
@@ -701,16 +734,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (!channel->monitor) {
                        DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
-                                       new_stream->net_seq_idx);
+                                       new_stream->relayd_id);
                        cds_list_add(&new_stream->send_node, &channel->streams.head);
+                       pthread_mutex_unlock(&channel->lock);
                        break;
                }
 
                /* Send stream to relayd if the stream has an ID. */
-               if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+               if (new_stream->relayd_id != (uint64_t) -1ULL) {
                        ret = consumer_send_relayd_stream(new_stream,
                                        new_stream->chan->pathname);
                        if (ret < 0) {
+                               pthread_mutex_unlock(&channel->lock);
                                consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
@@ -722,12 +757,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                         */
                        if (channel->streams_sent_to_relayd) {
                                ret = consumer_send_relayd_streams_sent(
-                                               new_stream->net_seq_idx);
+                                               new_stream->relayd_id);
                                if (ret < 0) {
+                                       pthread_mutex_unlock(&channel->lock);
                                        goto end_nosignal;
                                }
                        }
                }
+               pthread_mutex_unlock(&channel->lock);
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
@@ -1387,6 +1424,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
                break;
        case CONSUMER_CHANNEL_MMAP:
+       {
+               const char *subbuf_addr;
+               struct lttng_buffer_view subbuf_view;
+
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
@@ -1406,21 +1447,28 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        goto end;
                }
 
+               ret = get_current_subbuf_addr(stream, &subbuf_addr);
+               if (ret) {
+                       goto error_put_subbuf;
+               }
+
                /* Make sure the tracer is not gone mad on us! */
                assert(len >= subbuf_size);
 
                padding = len - subbuf_size;
 
+               subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
                /* write the subbuffer to the tracefile */
-               ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
-                               padding, &index);
+               ret = lttng_consumer_on_read_subbuffer_mmap(
+                               ctx, stream, &subbuf_view, padding, &index);
                /*
-                * The mmap operation should write subbuf_size amount of data when
-                * network streaming or the full padding (len) size when we are _not_
-                * streaming.
+                * The mmap operation should write subbuf_size amount of data
+                * when network streaming or the full padding (len) size when we
+                * are _not_ streaming.
                 */
-               if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                               (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
+               if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
+                               (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
                        /*
                         * Display the error but continue processing to try to release the
                         * subbuffer. This is a DBG statement since this is possible to
@@ -1432,11 +1480,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        write_index = 0;
                }
                break;
+       }
        default:
                ERR("Unknown output method");
                ret = -EPERM;
        }
-
+error_put_subbuf:
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
                if (err == -EFAULT) {
@@ -1498,7 +1547,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
         * Don't create anything if this is set for streaming or should not be
         * monitored.
         */
-       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
+       if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid, NULL);
This page took 0.029415 seconds and 5 git commands to generate.