Fix: consumer_add_relayd_socket() report errors to sessiond
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index a0818452f4f540ef880a3effb13c0e2b89fc0e10..02198bd1b9e854f26a8614c5f76fbf76a36835f1 100644 (file)
@@ -109,41 +109,46 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
  *
  * Returns 0 on success, < 0 on error
  */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
+static int send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
 {
-       struct consumer_relayd_sock_pair *relayd;
        int ret = 0;
-       char *stream_path;
+       const char *stream_path;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
 
        if (path != NULL) {
                stream_path = path;
        } else {
                stream_path = stream->chan->pathname;
        }
+
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock,
-                               stream->name, stream_path,
-                               &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               stream_path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        goto end;
                }
                uatomic_inc(&relayd->refcount);
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
                ret = -1;
                goto end;
        }
 
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
 end:
        rcu_read_unlock();
        return ret;
@@ -152,15 +157,14 @@ end:
 /*
  * Find a relayd and close the stream
  */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
+static void close_relayd_stream(struct lttng_consumer_stream *stream)
 {
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
+       if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
        }
        rcu_read_unlock();
@@ -204,6 +208,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                 */
                pthread_mutex_lock(&stream->lock);
 
+               /*
+                * Assign the received relayd ID so we can use it for streaming. The streams
+                * are not visible to anyone so this is OK to change it.
+                */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
@@ -212,7 +220,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
-                       DBG("Stream %s sent to the relayd", stream->name);
                } else {
                        ret = utils_create_stream_file(path, stream->name,
                                        stream->chan->tracefile_size, stream->tracefile_count_current,
@@ -282,22 +289,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto end_unlock;
+                               goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
                                        padded_len - len);
                        /*
-                        * We write the padded len in local tracefiles but the
-                        * data len when using a relay.
-                        * Display the error but continue processing to try to
-                        * release the subbuffer.
+                        * We write the padded len in local tracefiles but the data len
+                        * when using a relay. Display the error but continue processing
+                        * to try to release the subbuffer.
                         */
                        if (relayd_id != (uint64_t) -1ULL) {
                                if (read_len != len) {
@@ -337,6 +343,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        ret = 0;
        goto end;
 
+error_put_subbuf:
+       ret = kernctl_put_subbuf(stream->wait_fd);
+       if (ret < 0) {
+               ERR("Snapshot kernctl_put_subbuf error path");
+       }
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
@@ -352,9 +363,12 @@ end:
 int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
                uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
 {
+       int ret, use_relayd = 0;
+       ssize_t ret_read;
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
-       int ret;
+
+       assert(ctx);
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
                        key, path);
@@ -363,58 +377,66 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        metadata_channel = consumer_find_channel(key);
        if (!metadata_channel) {
-               ERR("Snapshot kernel metadata channel not found for key %lu", key);
+               ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
                ret = -1;
-               goto end;
+               goto error;
        }
 
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
+       /* Flag once that we have a valid relayd for the stream. */
        if (relayd_id != (uint64_t) -1ULL) {
+               use_relayd = 1;
+       }
+
+       if (use_relayd) {
                ret = send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
-                       ERR("sending stream to relayd");
+                       goto error;
                }
-               DBG("Stream %s sent to the relayd", metadata_stream->name);
        } else {
                ret = utils_create_stream_file(path, metadata_stream->name,
                                metadata_stream->chan->tracefile_size,
                                metadata_stream->tracefile_count_current,
                                metadata_stream->uid, metadata_stream->gid);
                if (ret < 0) {
-                       goto end;
+                       goto error;
                }
                metadata_stream->out_fd = ret;
        }
 
-       ret = 0;
-       while (ret >= 0) {
-               ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
-               if (ret < 0) {
-                       if (ret != -EPERM) {
-                               ERR("Kernel snapshot reading subbuffer");
-                               goto end;
+       do {
+               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               if (ret_read < 0) {
+                       if (ret_read != -EPERM) {
+                               ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)",
+                                               ret_read);
+                               goto error;
                        }
-                       /* "ret" is negative at this point so we will exit the loop. */
+                       /* ret_read is negative at this point so we will exit the loop. */
                        continue;
                }
-       }
+       } while (ret_read >= 0);
 
-       if (relayd_id == (uint64_t) -1ULL) {
+       if (use_relayd) {
+               close_relayd_stream(metadata_stream);
+               metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+       } else {
                ret = close(metadata_stream->out_fd);
                if (ret < 0) {
-                       PERROR("Kernel consumer snapshot close out_fd");
-                       goto end;
+                       PERROR("Kernel consumer snapshot metadata close out_fd");
+                       /*
+                        * Don't go on error here since the snapshot was successful at this
+                        * point but somehow the close failed.
+                        */
                }
                metadata_stream->out_fd = -1;
-       } else {
-               close_relayd_stream(metadata_stream);
-               metadata_stream->net_seq_idx = (uint64_t) -1ULL;
        }
 
        ret = 0;
-end:
+
+error:
        rcu_read_unlock();
        return ret;
 }
@@ -557,20 +579,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
-                       /*
-                        * Somehow, the session daemon is not responding
-                        * anymore.
-                        */
+                       /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
                if (ret_code != LTTNG_OK) {
-                       /*
-                        * Channel was not found.
-                        */
+                       /* Channel was not found. */
                        goto end_nosignal;
                }
 
-               /* block */
+               /* Blocking call */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
                        return -EINTR;
@@ -616,6 +633,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        goto end_nosignal;
                }
+
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
@@ -653,7 +671,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
-                               consumer_del_stream(new_stream, NULL);
+                               consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
                }
@@ -674,7 +692,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = send_relayd_stream(new_stream, NULL);
                if (ret < 0) {
-                       consumer_del_stream(new_stream, NULL);
+                       consumer_stream_free(new_stream);
                        goto end_nosignal;
                }
 
@@ -690,7 +708,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_del_stream(new_stream, NULL);
+                       consumer_stream_free(new_stream);
                        goto end_nosignal;
                }
 
This page took 0.030026 seconds and 5 git commands to generate.