consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 4c454da0023bc1647f3c7f9a70b6921d91a1454a..f6a5ba7f54a7859cb69a7052db1e82de4907eeec 100644 (file)
@@ -16,6 +16,7 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include "common/buffer-view.h"
 #include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
@@ -259,9 +260,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ssize_t read_len;
                        unsigned long len, padded_len;
                        const char *subbuf_addr;
+                       struct lttng_buffer_view subbuf_view;
 
                        health_code_update();
-
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -293,8 +294,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                goto error_put_subbuf;
                        }
 
+                       subbuf_view = lttng_buffer_view_init(
+                                       subbuf_addr, 0, padded_len);
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
-                                       stream, subbuf_addr, len,
+                                       stream, &subbuf_view,
                                        padded_len - len, NULL);
                        /*
                         * We write the padded len in local tracefiles but the data len
@@ -517,6 +520,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.tracefile_count, 0,
                                msg.u.channel.monitor,
                                msg.u.channel.live_timer_interval,
+                               msg.u.channel.is_live,
                                NULL, NULL);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
@@ -647,7 +651,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
-               new_stream = consumer_allocate_stream(channel->key,
+               pthread_mutex_lock(&channel->lock);
+               new_stream = consumer_allocate_stream(
+                               channel,
+                               channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
                                channel->name,
@@ -667,10 +674,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                                break;
                        }
+                       pthread_mutex_unlock(&channel->lock);
                        goto end_nosignal;
                }
 
-               new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
                case CONSUMER_CHANNEL_SPLICE:
@@ -730,6 +737,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        "relayd id %" PRIu64, new_stream->name,
                                        new_stream->relayd_id);
                        cds_list_add(&new_stream->send_node, &channel->streams.head);
+                       pthread_mutex_unlock(&channel->lock);
                        break;
                }
 
@@ -738,6 +746,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = consumer_send_relayd_stream(new_stream,
                                        new_stream->chan->pathname);
                        if (ret < 0) {
+                               pthread_mutex_unlock(&channel->lock);
                                consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
@@ -751,10 +760,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                ret = consumer_send_relayd_streams_sent(
                                                new_stream->relayd_id);
                                if (ret < 0) {
+                                       pthread_mutex_unlock(&channel->lock);
                                        goto end_nosignal;
                                }
                        }
                }
+               pthread_mutex_unlock(&channel->lock);
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
@@ -1416,6 +1427,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        case CONSUMER_CHANNEL_MMAP:
        {
                const char *subbuf_addr;
+               struct lttng_buffer_view subbuf_view;
 
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
@@ -1446,15 +1458,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                padding = len - subbuf_size;
 
+               subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
                /* write the subbuffer to the tracefile */
-               ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
-                               subbuf_addr,
-                               subbuf_size,
-                               padding, &index);
+               ret = lttng_consumer_on_read_subbuffer_mmap(
+                               ctx, stream, &subbuf_view, padding, &index);
                /*
-                * The mmap operation should write subbuf_size amount of data when
-                * network streaming or the full padding (len) size when we are _not_
-                * streaming.
+                * The mmap operation should write subbuf_size amount of data
+                * when network streaming or the full padding (len) size when we
+                * are _not_ streaming.
                 */
                if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) ||
                                (ret != len && stream->relayd_id == (uint64_t) -1ULL)) {
This page took 0.031781 seconds and 5 git commands to generate.