SoW-2020-0002: Trace Hit Counters: trigger error reporting integration
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index e06f3b3e079aae44f494ba6d4409f02120018af4..b9d843714550c5821d3b091eb7fb6e390a3c6786 100644 (file)
@@ -19,6 +19,7 @@
 #include <inttypes.h>
 #include <unistd.h>
 #include <sys/stat.h>
+#include <stdint.h>
 
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
@@ -36,7 +37,7 @@
 #include <common/optional.h>
 #include <common/buffer-view.h>
 #include <common/consumer/consumer.h>
-#include <stdint.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "kernel-consumer.h"
 
@@ -287,7 +288,7 @@ static int lttng_kconsumer_snapshot_channel(
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view,
                                        padded_len - len);
                        /*
@@ -1206,8 +1207,8 @@ error_rotate_channel:
        case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
        {
                const struct lttng_credentials credentials = {
-                       .uid = msg.u.create_trace_chunk.credentials.value.uid,
-                       .gid = msg.u.create_trace_chunk.credentials.value.gid,
+                       .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
+                       .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
                };
                const bool is_local_trace =
                                !msg.u.create_trace_chunk.relayd_id.is_set;
@@ -1310,6 +1311,24 @@ error_rotate_channel:
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+       {
+               const uint64_t key = msg.u.open_channel_packets.key;
+               struct lttng_consumer_channel *channel =
+                               consumer_find_channel(key);
+
+               if (channel) {
+                       pthread_mutex_lock(&channel->lock);
+                       ret_code = lttng_consumer_open_channel_packets(channel);
+                       pthread_mutex_unlock(&channel->lock);
+               } else {
+                       WARN("Channel %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+               goto end_msg_sessiond;
+       }
        default:
                goto end_nosignal;
        }
@@ -1347,36 +1366,38 @@ end:
  * metadata thread can consumer them.
  *
  * Metadata stream lock MUST be acquired.
- *
- * Return 0 if new metadatda is available, EAGAIN if the metadata stream
- * is empty or a negative value on error.
  */
-int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+enum sync_metadata_status lttng_kconsumer_sync_metadata(
+               struct lttng_consumer_stream *metadata)
 {
        int ret;
+       enum sync_metadata_status status;
 
        assert(metadata);
 
        ret = kernctl_buffer_flush(metadata->wait_fd);
        if (ret < 0) {
                ERR("Failed to flush kernel stream");
+               status = SYNC_METADATA_STATUS_ERROR;
                goto end;
        }
 
        ret = kernctl_snapshot(metadata->wait_fd);
        if (ret < 0) {
-               if (ret != -EAGAIN) {
+               if (errno == EAGAIN) {
+                       /* No new metadata, exit. */
+                       DBG("Sync metadata, no new kernel metadata");
+                       status = SYNC_METADATA_STATUS_NO_DATA;
+               } else {
                        ERR("Sync metadata, taking kernel snapshot failed.");
-                       goto end;
+                       status = SYNC_METADATA_STATUS_ERROR;
                }
-               DBG("Sync metadata, no new kernel metadata");
-               /* No new metadata, exit. */
-               ret = ENODATA;
-               goto end;
+       } else {
+               status = SYNC_METADATA_STATUS_NEW_DATA;
        }
 
 end:
-       return ret;
+       return status;
 }
 
 static
@@ -1558,6 +1579,42 @@ end:
        return ret;
 }
 
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+       const char *addr;
+       bool coherent;
+
+       ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+                       &coherent);
+       if (ret) {
+               goto end;
+       }
+
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+                       stream, subbuffer);
+       if (ret) {
+               goto end;
+       }
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+       ret = get_current_subbuf_addr(stream, &addr);
+       if (ret) {
+               goto end;
+       }
+
+       subbuffer->buffer.buffer = lttng_buffer_view_init(
+                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+       DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+                       subbuffer->info.metadata.padded_subbuf_size,
+                       coherent ? "true" : "false");
+end:
+       return ret;
+}
+
 static
 int put_next_subbuffer(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
@@ -1576,15 +1633,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-static void lttng_kconsumer_set_stream_ops(
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+       return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+                       -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
-       if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_mmap;
-       } else {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_splice;
+       int ret = 0;
+
+       if (stream->metadata_flag && stream->chan->is_live) {
+               DBG("Attempting to enable metadata bucketization for live consumers");
+               if (is_get_next_check_metadata_available(stream->wait_fd)) {
+                       DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_metadata_check;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               } else {
+                       /*
+                        * The kernel tracer version is too old to indicate
+                        * when the metadata stream has reached a "coherent"
+                        * (parseable) point.
+                        *
+                        * This means that a live viewer may see an incoherent
+                        * sequence of metadata and fail to parse it.
+                        */
+                       WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+                       metadata_bucket_destroy(stream->metadata_bucket);
+                       stream->metadata_bucket = NULL;
+               }
+       }
+
+       if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+               if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_mmap;
+               } else {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_splice;
+               }
        }
 
        if (stream->metadata_flag) {
@@ -1600,6 +1695,8 @@ static void lttng_kconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -1640,7 +1737,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                }
        }
 
-       lttng_kconsumer_set_stream_ops(stream);
+       ret = lttng_kconsumer_set_stream_ops(stream);
+       if (ret) {
+               goto error_close_fd;
+       }
 
        /* we return 0 to let the library handle the FD internally */
        return 0;
This page took 0.028017 seconds and 5 git commands to generate.