Send ust and kernel domain directory handle to consumer
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 04d7efe5494443ab18dfea64d5e67561616fd66b..41317fe657d57cd4d5b5873c03fc2e3e77793118 100644 (file)
@@ -775,10 +775,19 @@ static int flush_channel(uint64_t chan_key)
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
+
+               /*
+                * Protect against concurrent teardown of a stream.
+                */
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
                if (!stream->quiescent) {
                        ustctl_flush_buffer(stream->ustream, 0);
                        stream->quiescent = true;
                }
+next:
                pthread_mutex_unlock(&stream->lock);
        }
 error:
@@ -1070,7 +1079,6 @@ error_stream:
         * Clean up the stream completly because the next snapshot will use a new
         * metadata stream.
         */
-       pthread_mutex_lock(&metadata_stream->lock);
        consumer_stream_destroy(metadata_stream, NULL);
        cds_list_del(&metadata_stream->send_node);
        metadata_channel->metadata_stream = NULL;
@@ -2031,8 +2039,7 @@ end_rotate_channel_nosignal:
                                *msg.u.create_trace_chunk.override_name ?
                                        msg.u.create_trace_chunk.override_name :
                                        NULL;
-               LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle =
-                               LTTNG_OPTIONAL_INIT;
+               struct lttng_directory_handle *chunk_directory_handle = NULL;
 
                /*
                 * The session daemon will only provide a chunk directory file
@@ -2049,25 +2056,26 @@ end_rotate_channel_nosignal:
                                goto end_nosignal;
                        }
 
+                       /*
+                        * Receive trace chunk domain dirfd.
+                        */
                        ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
                        if (ret != sizeof(chunk_dirfd)) {
-                               ERR("Failed to receive trace chunk directory file descriptor");
+                               ERR("Failed to receive trace chunk domain directory file descriptor");
                                goto error_fatal;
                        }
 
-                       DBG("Received trace chunk directory fd (%d)",
+                       DBG("Received trace chunk domain directory fd (%d)",
                                        chunk_dirfd);
-                       ret = lttng_directory_handle_init_from_dirfd(
-                                       &chunk_directory_handle.value,
+                       chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
                                        chunk_dirfd);
-                       if (ret) {
-                               ERR("Failed to initialize chunk directory handle from directory file descriptor");
+                       if (!chunk_directory_handle) {
+                               ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
                                if (close(chunk_dirfd)) {
                                        PERROR("Failed to close chunk directory file descriptor");
                                }
                                goto error_fatal;
                        }
-                       chunk_directory_handle.is_set = true;
                }
 
                ret_code = lttng_consumer_create_trace_chunk(
@@ -2080,14 +2088,8 @@ end_rotate_channel_nosignal:
                                msg.u.create_trace_chunk.credentials.is_set ?
                                                &credentials :
                                                NULL,
-                               chunk_directory_handle.is_set ?
-                                               &chunk_directory_handle.value :
-                                               NULL);
-
-               if (chunk_directory_handle.is_set) {
-                       lttng_directory_handle_fini(
-                                       &chunk_directory_handle.value);
-               }
+                               chunk_directory_handle);
+               lttng_directory_handle_put(chunk_directory_handle);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
@@ -2096,6 +2098,9 @@ end_rotate_channel_nosignal:
                                msg.u.close_trace_chunk.close_command.value;
                const uint64_t relayd_id =
                                msg.u.close_trace_chunk.relayd_id.value;
+               struct lttcomm_consumer_close_trace_chunk_reply reply;
+               char closed_trace_chunk_path[LTTNG_PATH_MAX];
+               int ret;
 
                ret_code = lttng_consumer_close_trace_chunk(
                                msg.u.close_trace_chunk.relayd_id.is_set ?
@@ -2106,8 +2111,19 @@ end_rotate_channel_nosignal:
                                (time_t) msg.u.close_trace_chunk.close_timestamp,
                                msg.u.close_trace_chunk.close_command.is_set ?
                                                &close_command :
-                                               NULL);
-               goto end_msg_sessiond;
+                                               NULL, closed_trace_chunk_path);
+               reply.ret_code = ret_code;
+               reply.path_length = strlen(closed_trace_chunk_path) + 1;
+               ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+               if (ret != sizeof(reply)) {
+                       goto error_fatal;
+               }
+               ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
+                               reply.path_length);
+               if (ret != reply.path_length) {
+                       goto error_fatal;
+               }
+               goto end_nosignal;
        }
        case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
        {
@@ -2148,7 +2164,6 @@ end_msg_sessiond:
 
 end_channel_error:
        if (channel) {
-               pthread_mutex_unlock(&channel->lock);
                /*
                 * Free channel here since no one has a reference to it. We don't
                 * free after that because a stream can store this pointer.
@@ -2401,62 +2416,69 @@ static int get_index_values(struct ctf_packet_index *index,
                struct ustctl_consumer_stream *ustream)
 {
        int ret;
+       uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
+                       events_discarded, stream_id, stream_instance_id,
+                       packet_seq_num;
 
-       ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
+       ret = ustctl_get_timestamp_begin(ustream, &timestamp_begin);
        if (ret < 0) {
                PERROR("ustctl_get_timestamp_begin");
                goto error;
        }
-       index->timestamp_begin = htobe64(index->timestamp_begin);
 
-       ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
+       ret = ustctl_get_timestamp_end(ustream, &timestamp_end);
        if (ret < 0) {
                PERROR("ustctl_get_timestamp_end");
                goto error;
        }
-       index->timestamp_end = htobe64(index->timestamp_end);
 
-       ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
+       ret = ustctl_get_events_discarded(ustream, &events_discarded);
        if (ret < 0) {
                PERROR("ustctl_get_events_discarded");
                goto error;
        }
-       index->events_discarded = htobe64(index->events_discarded);
 
-       ret = ustctl_get_content_size(ustream, &index->content_size);
+       ret = ustctl_get_content_size(ustream, &content_size);
        if (ret < 0) {
                PERROR("ustctl_get_content_size");
                goto error;
        }
-       index->content_size = htobe64(index->content_size);
 
-       ret = ustctl_get_packet_size(ustream, &index->packet_size);
+       ret = ustctl_get_packet_size(ustream, &packet_size);
        if (ret < 0) {
                PERROR("ustctl_get_packet_size");
                goto error;
        }
-       index->packet_size = htobe64(index->packet_size);
 
-       ret = ustctl_get_stream_id(ustream, &index->stream_id);
+       ret = ustctl_get_stream_id(ustream, &stream_id);
        if (ret < 0) {
                PERROR("ustctl_get_stream_id");
                goto error;
        }
-       index->stream_id = htobe64(index->stream_id);
 
-       ret = ustctl_get_instance_id(ustream, &index->stream_instance_id);
+       ret = ustctl_get_instance_id(ustream, &stream_instance_id);
        if (ret < 0) {
                PERROR("ustctl_get_instance_id");
                goto error;
        }
-       index->stream_instance_id = htobe64(index->stream_instance_id);
 
-       ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num);
+       ret = ustctl_get_sequence_number(ustream, &packet_seq_num);
        if (ret < 0) {
                PERROR("ustctl_get_sequence_number");
                goto error;
        }
-       index->packet_seq_num = htobe64(index->packet_seq_num);
+
+       *index = (typeof(*index)) {
+               .offset = index->offset,
+               .packet_size = htobe64(packet_size),
+               .content_size = htobe64(content_size),
+               .timestamp_begin = htobe64(timestamp_begin),
+               .timestamp_end = htobe64(timestamp_end),
+               .events_discarded = htobe64(events_discarded),
+               .stream_id = htobe64(stream_id),
+               .stream_instance_id = htobe64(stream_instance_id),
+               .packet_seq_num = htobe64(packet_seq_num),
+       };
 
 error:
        return ret;
@@ -2554,37 +2576,59 @@ end:
  * interacting with sessiond, else we cause a deadlock with live
  * awaiting on metadata to be pushed out.
  *
+ * The RCU read side lock must be held by the caller.
+ *
  * Return 0 if new metadatda is available, EAGAIN if the metadata stream
  * is empty or a negative value on error.
  */
 int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *metadata)
+               struct lttng_consumer_stream *metadata_stream)
 {
        int ret;
        int retry = 0;
+       struct lttng_consumer_channel *metadata_channel;
 
        assert(ctx);
-       assert(metadata);
+       assert(metadata_stream);
 
-       pthread_mutex_unlock(&metadata->lock);
+       metadata_channel = metadata_stream->chan;
+       pthread_mutex_unlock(&metadata_stream->lock);
        /*
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
-       pthread_mutex_lock(&metadata->lock);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                goto end;
        }
 
-       ret = commit_one_metadata_packet(metadata);
+       /*
+        * The metadata stream and channel can be deleted while the
+        * metadata stream lock was released. The streamed is checked
+        * for deletion before we use it further.
+        *
+        * Note that it is safe to access a logically-deleted stream since its
+        * existence is still guaranteed by the RCU read side lock. However,
+        * it should no longer be used. The close/deletion of the metadata
+        * channel and stream already guarantees that all metadata has been
+        * consumed. Therefore, there is nothing left to do in this function.
+        */
+       if (consumer_stream_is_deleted(metadata_stream)) {
+               DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
+                               metadata_stream->key);
+               ret = 0;
+               goto end;
+       }
+
+       ret = commit_one_metadata_packet(metadata_stream);
        if (ret <= 0) {
                goto end;
        } else if (ret > 0) {
                retry = 1;
        }
 
-       ret = ustctl_snapshot(metadata->ustream);
+       ret = ustctl_snapshot(metadata_stream->ustream);
        if (ret < 0) {
                if (errno != EAGAIN) {
                        ERR("Sync metadata, taking UST snapshot");
@@ -3031,7 +3075,7 @@ end:
  * Stop a given metadata channel timer if enabled and close the wait fd which
  * is the poll pipe of the metadata stream.
  *
- * This MUST be called with the metadata channel acquired.
+ * This MUST be called with the metadata channel lock acquired.
  */
 void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
 {
This page took 0.027564 seconds and 5 git commands to generate.