Support flight recorder mode for a session
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index f3a3a22b55a230a427bad1fe7c7cb844de236d8a..86428f0663141fa250905e269d7ceea22c5cb43d 100644 (file)
@@ -83,6 +83,11 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -93,6 +98,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               if (ret > 0) {
+                       ret = -1;
+               }
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
@@ -128,7 +136,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
@@ -136,7 +144,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
-                               msg.u.channel.tracefile_count);
+                               msg.u.channel.tracefile_count,
+                               msg.u.channel.monitor);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -166,8 +175,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                /* If we received an error in add_channel, we need to report it. */
-               if (ret != 0) {
-                       consumer_send_status_msg(sock, ret);
+               if (ret < 0) {
+                       ret = consumer_send_status_msg(sock, ret);
+                       if (ret < 0) {
+                               goto error_fatal;
+                       }
                        goto end_nosignal;
                }
 
@@ -198,10 +210,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0 || ret_code != LTTNG_OK) {
+               if (ret < 0) {
+                       /*
+                        * Somehow, the session daemon is not responding
+                        * anymore.
+                        */
+                       goto error_fatal;
+               }
+               if (ret_code != LTTNG_OK) {
                        /*
-                        * Somehow, the session daemon is not responding anymore or the
-                        * channel was not found.
+                        * Channel was not found.
                         */
                        goto end_nosignal;
                }
@@ -255,6 +273,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
 
+               /*
+                * We've just assigned the channel to the stream so increment the
+                * refcount right now.
+                */
+               uatomic_inc(&new_stream->chan->refcount);
+
                /*
                 * The buffer flush is done on the session daemon side for the kernel
                 * so no need for the stream "hangup_flush_done" variable to be
@@ -294,6 +318,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               /* Do not monitor this stream. */
+               if (!channel->monitor) {
+                       DBG("Kernel consumer add stream %s in no monitor mode with"
+                                       "relayd id %" PRIu64, new_stream->name,
+                                       new_stream->relayd_stream_id);
+                       break;
+               }
+
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
                        stream_pipe = ctx->consumer_metadata_pipe;
@@ -350,7 +382,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
 
                goto end_nosignal;
@@ -368,6 +400,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
                        PERROR("send data pending ret code");
+                       goto error_fatal;
                }
 
                /*
@@ -376,6 +409,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                break;
        }
+       case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
+       {
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
        default:
                goto end_nosignal;
        }
@@ -388,6 +430,11 @@ end_nosignal:
         * shutdown during the recv() or send() call.
         */
        return 1;
+
+error_fatal:
+       rcu_read_unlock();
+       /* This will issue a consumer stop. */
+       return -1;
 }
 
 /*
@@ -516,8 +563,11 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
-       /* Don't create anything if this is set for streaming. */
-       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+       /*
+        * Don't create anything if this is set for streaming or should not be
+        * monitored.
+        */
+       if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid);
This page took 0.02903 seconds and 5 git commands to generate.