Create stream files relative to a stream's current trace chunk
[lttng-tools.git] / src / bin / lttng-sessiond / session.c
index 6c10aaeba9259286cdac4a43db72619d8a4e8f51..abac2404f04bdda9b2c3a94ec8507c950f3a427a 100644 (file)
@@ -67,13 +67,6 @@ static const char *forbidden_name_chars = "/";
 /* Global hash table to keep the sessions, indexed by id. */
 static struct lttng_ht *ltt_sessions_ht_by_id = NULL;
 
-struct consumer_create_chunk_transaction {
-       struct consumer_socket *socket;
-       struct lttng_trace_chunk *new_chunk;
-       struct lttng_trace_chunk *previous_chunk;
-       bool new_chunk_created;
-};
-
 /*
  * Validate the session name for forbidden characters.
  *
@@ -254,16 +247,26 @@ void session_get_net_consumer_ports(const struct ltt_session *session,
 struct lttng_trace_archive_location *session_get_trace_archive_location(
                struct ltt_session *session)
 {
+       int ret;
        struct lttng_trace_archive_location *location = NULL;
+       char *chunk_path = NULL;
+
+       if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED ||
+                       !session->last_archived_chunk_name) {
+               goto end;
+       }
 
-       if (session->rotation_state != LTTNG_ROTATION_STATE_COMPLETED) {
+       ret = asprintf(&chunk_path, "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY "/%s",
+                       session_get_base_path(session),
+                       session->last_archived_chunk_name);
+       if (ret == -1) {
                goto end;
        }
 
        switch (session_get_consumer_destination_type(session)) {
        case CONSUMER_DST_LOCAL:
                location = lttng_trace_archive_location_local_create(
-                               session->rotation_chunk.current_rotate_path);
+                               chunk_path);
                break;
        case CONSUMER_DST_NET:
        {
@@ -277,14 +280,14 @@ struct lttng_trace_archive_location *session_get_trace_archive_location(
                location = lttng_trace_archive_location_relay_create(
                                hostname,
                                LTTNG_TRACE_ARCHIVE_LOCATION_RELAY_PROTOCOL_TYPE_TCP,
-                               control_port, data_port,
-                               session->rotation_chunk.current_rotate_path);
+                               control_port, data_port, chunk_path);
                break;
        }
        default:
                abort();
        }
 end:
+       free(chunk_path);
        return location;
 }
 
@@ -411,150 +414,130 @@ void session_unlock(struct ltt_session *session)
 
 static
 int _session_set_trace_chunk_no_lock_check(struct ltt_session *session,
-               struct lttng_trace_chunk *new_trace_chunk)
+               struct lttng_trace_chunk *new_trace_chunk,
+               struct lttng_trace_chunk **_current_trace_chunk)
 {
        int ret;
        unsigned int i, refs_to_acquire = 0, refs_acquired = 0, refs_to_release = 0;
-       unsigned int consumer_count = 0;
-       /*
-        * The maximum amount of consumers to reach is 3
-        * (32/64 userspace + kernel).
-        */
-       struct consumer_create_chunk_transaction transactions[3] = {};
        struct cds_lfht_iter iter;
        struct consumer_socket *socket;
-       bool close_error_occured = false;
-
-       if (new_trace_chunk) {
-               uint64_t chunk_id;
-               enum lttng_trace_chunk_status chunk_status =
-                               lttng_trace_chunk_get_id(new_trace_chunk,
-                                       &chunk_id);
-
-               assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-               LTTNG_OPTIONAL_SET(&session->last_trace_chunk_id, chunk_id)
-       }
-
-       if (new_trace_chunk) {
-               refs_to_acquire = 1;
-               refs_to_acquire += !!session->ust_session;
-               refs_to_acquire += !!session->kernel_session;
-       }
+       struct lttng_trace_chunk *current_trace_chunk;
+       uint64_t chunk_id;
+       enum lttng_trace_chunk_status chunk_status;
+       const uint64_t relayd_id = session->consumer->net_seq_index;
+       const bool is_local_trace = relayd_id  == -1ULL;
 
+       rcu_read_lock();
        /*
-        * Build a list of consumers to reach to announce the new trace chunk.
-        *
-        * Rolling back the annoucement in case of an error is important since
-        * not doing so would result in a leak; the chunk will not be
-        * "reclaimed" by the consumer(s) since they have no concept of the
-        * lifetime of a session.
+        * Ownership of current trace chunk is transferred to
+        * `current_trace_chunk`.
         */
+       current_trace_chunk = session->current_trace_chunk;
+       session->current_trace_chunk = NULL;
        if (session->ust_session) {
-               cds_lfht_for_each_entry(
-                               session->ust_session->consumer->socks->ht,
-                               &iter, socket, node.node) {
-                       transactions[consumer_count].socket = socket;
-                       transactions[consumer_count].new_chunk = new_trace_chunk;
-                       transactions[consumer_count].previous_chunk =
-                                       session->current_trace_chunk;
-                       consumer_count++;
-                       assert(consumer_count <= 3);
-               }
+               lttng_trace_chunk_put(
+                               session->ust_session->current_trace_chunk);
+               session->ust_session->current_trace_chunk = NULL;
        }
        if (session->kernel_session) {
-               cds_lfht_for_each_entry(
-                               session->kernel_session->consumer->socks->ht,
-                               &iter, socket, node.node) {
-                       transactions[consumer_count].socket = socket;
-                       transactions[consumer_count].new_chunk = new_trace_chunk;
-                       transactions[consumer_count].previous_chunk =
-                                       session->current_trace_chunk;
-                       consumer_count++;
-                       assert(consumer_count <= 3);
-               }
+               lttng_trace_chunk_put(
+                               session->kernel_session->current_trace_chunk);
+               session->kernel_session->current_trace_chunk = NULL;
        }
-       for (refs_acquired = 0; refs_acquired < refs_to_acquire; refs_acquired++) {
-               if (new_trace_chunk && !lttng_trace_chunk_get(new_trace_chunk)) {
-                       ERR("Failed to acquire reference to new current trace chunk of session \"%s\"",
-                                       session->name);
-                       goto error;
-               }
+       if (!new_trace_chunk) {
+               ret = 0;
+               goto end;
        }
+       chunk_status = lttng_trace_chunk_get_id(new_trace_chunk, &chunk_id);
+       assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
-       /*
-        * Close the previous chunk on remote peers (consumers and relayd).
-        */
-       for (i = 0; i < consumer_count; i++) {
-               if (!transactions[i].previous_chunk) {
-                       continue;
-               }
-               pthread_mutex_lock(transactions[i].socket->lock);
-               ret = consumer_close_trace_chunk(transactions[i].socket,
-                               session->consumer->net_seq_index,
-                               session->id,
-                               transactions[i].previous_chunk);
-               pthread_mutex_unlock(transactions[i].socket->lock);
-               if (ret) {
-                       ERR("Failed to close trace chunk on consumer");
-                       close_error_occured = true;
+       refs_to_acquire = 1;
+       refs_to_acquire += !!session->ust_session;
+       refs_to_acquire += !!session->kernel_session;
+
+       for (refs_acquired = 0; refs_acquired < refs_to_acquire;
+                       refs_acquired++) {
+               if (!lttng_trace_chunk_get(new_trace_chunk)) {
+                       ERR("Failed to acquire reference to new trace chunk of session \"%s\"",
+                                       session->name);
+                       goto error;
                }
        }
 
-       if (close_error_occured) {
-               /*
-                * Skip the creation of the new trace chunk and report the
-                * error.
-                */
-               goto error;
-       }
+       if (session->ust_session) {
+               session->ust_session->current_trace_chunk = new_trace_chunk;
+                if (is_local_trace) {
+                       enum lttng_error_code ret_error_code;
 
-       /* Create the new chunk on remote peers (consumers and relayd) */
-       if (new_trace_chunk) {
-               for (i = 0; i < consumer_count; i++) {
-                       pthread_mutex_lock(transactions[i].socket->lock);
-                       ret = consumer_create_trace_chunk(transactions[i].socket,
-                                       session->consumer->net_seq_index,
-                                       session->id,
-                                       transactions[i].new_chunk);
-                       pthread_mutex_unlock(transactions[i].socket->lock);
-                       if (ret) {
-                               ERR("Failed to create trace chunk on consumer");
+                       ret_error_code = ust_app_create_channel_subdirectories(
+                                       session->ust_session);
+                       if (ret_error_code != LTTNG_OK) {
+                               ret = -ret_error_code;
                                goto error;
                        }
-                       /* This will have to be rolled-back on error. */
-                       transactions[i].new_chunk_created = true;
-               }
-       }
-
-       lttng_trace_chunk_put(session->current_trace_chunk);
-       session->current_trace_chunk = NULL;
-       if (session->ust_session) {
-               lttng_trace_chunk_put(
-                               session->ust_session->current_trace_chunk);
-               session->ust_session->current_trace_chunk = NULL;
-       }
+                }
+               cds_lfht_for_each_entry(
+                               session->ust_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_create_trace_chunk(socket,
+                                       relayd_id,
+                                       session->id, new_trace_chunk);
+                       pthread_mutex_unlock(socket->lock);
+                        if (ret) {
+                               goto error;
+                        }
+                }
+        }
        if (session->kernel_session) {
-               lttng_trace_chunk_put(
-                               session->kernel_session->current_trace_chunk);
-               session->kernel_session->current_trace_chunk = NULL;
-       }
+               session->kernel_session->current_trace_chunk = new_trace_chunk;
+               if (is_local_trace) {
+                       enum lttng_error_code ret_error_code;
+
+                       ret_error_code = kernel_create_channel_subdirectories(
+                                       session->kernel_session);
+                       if (ret_error_code != LTTNG_OK) {
+                               ret = -ret_error_code;
+                               goto error;
+                       }
+                }
+               cds_lfht_for_each_entry(
+                               session->kernel_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_create_trace_chunk(socket,
+                                       relayd_id,
+                                       session->id, new_trace_chunk);
+                       pthread_mutex_unlock(socket->lock);
+                        if (ret) {
+                               goto error;
+                        }
+                }
+        }
 
        /*
         * Update local current trace chunk state last, only if all remote
-        * annoucements succeeded.
+        * creations succeeded.
         */
        session->current_trace_chunk = new_trace_chunk;
+       LTTNG_OPTIONAL_SET(&session->most_recent_chunk_id, chunk_id);
+end:
+       if (_current_trace_chunk) {
+               *_current_trace_chunk = current_trace_chunk;
+               current_trace_chunk = NULL;
+       }
+end_no_move:
+       rcu_read_unlock();
+       lttng_trace_chunk_put(current_trace_chunk);
+       return ret;
+error:
        if (session->ust_session) {
-               session->ust_session->current_trace_chunk = new_trace_chunk;
+               session->ust_session->current_trace_chunk = NULL;
        }
        if (session->kernel_session) {
-               session->kernel_session->current_trace_chunk =
-                               new_trace_chunk;
+               session->kernel_session->current_trace_chunk = NULL;
        }
-
-       return 0;
-error:
-       /*
+        /*
         * Release references taken in the case where all references could not
         * be acquired.
         */
@@ -562,34 +545,12 @@ error:
        for (i = 0; i < refs_to_release; i++) {
                lttng_trace_chunk_put(new_trace_chunk);
        }
-
-       /*
-        * Close the newly-created chunk from remote peers (consumers and
-        * relayd).
-        */
-       DBG("Rolling back the creation of the new trace chunk on consumers");
-       for (i = 0; i < consumer_count; i++) {
-               if (!transactions[i].new_chunk_created) {
-                       continue;
-               }
-
-               pthread_mutex_lock(transactions[i].socket->lock);
-               ret = consumer_close_trace_chunk(transactions[i].socket,
-                               session->consumer->net_seq_index,
-                               session->id,
-                               transactions[i].new_chunk);
-               pthread_mutex_unlock(transactions[i].socket->lock);
-               if (ret) {
-                       ERR("Failed to close trace chunk on consumer");
-                       close_error_occured = true;
-               }
-       }
-
-       return -1;
+       ret = -1;
+       goto end_no_move;
 }
 
 static
-bool output_supports_chunks(const struct ltt_session *session)
+bool output_supports_trace_chunks(const struct ltt_session *session)
 {
        if (session->consumer->type == CONSUMER_DST_LOCAL) {
                return true;
@@ -614,15 +575,15 @@ bool output_supports_chunks(const struct ltt_session *session)
        return false;
 }
 
-enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session,
+struct lttng_trace_chunk *session_create_new_trace_chunk(
+               struct ltt_session *session,
                const char *session_base_path_override,
                const char *chunk_name_override)
 {
        int ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
        struct lttng_trace_chunk *trace_chunk = NULL;
        enum lttng_trace_chunk_status chunk_status;
-       const time_t timestamp_begin = time(NULL);
+       const time_t chunk_creation_ts = time(NULL);
        const bool is_local_trace =
                        session->consumer->type == CONSUMER_DST_LOCAL;
        const char *base_path = session_base_path_override ? :
@@ -634,37 +595,28 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session,
        };
        uint64_t next_chunk_id;
 
-       if (timestamp_begin == (time_t) -1) {
-               PERROR("Failed to sample time while changing session \"%s\" trace chunk",
+       if (chunk_creation_ts == (time_t) -1) {
+               PERROR("Failed to sample time while creation session \"%s\" trace chunk",
                                session->name);
-               ret_code = LTTNG_ERR_FATAL;
                goto error;
        }
-       session->current_chunk_start_ts = timestamp_begin;
 
-       if (!output_supports_chunks(session)) {
+       if (!output_supports_trace_chunks(session)) {
                goto end;
        }
-       next_chunk_id = session->last_trace_chunk_id.is_set ?
-                       session->last_trace_chunk_id.value + 1 : 0;
+       next_chunk_id = session->most_recent_chunk_id.is_set ?
+                       session->most_recent_chunk_id.value + 1 : 0;
 
-       trace_chunk = lttng_trace_chunk_create(next_chunk_id, timestamp_begin);
+       trace_chunk = lttng_trace_chunk_create(next_chunk_id,
+                       chunk_creation_ts);
        if (!trace_chunk) {
-               ret_code = LTTNG_ERR_FATAL;
                goto error;
        }
 
        if (chunk_name_override) {
                chunk_status = lttng_trace_chunk_override_name(trace_chunk,
                                chunk_name_override);
-               switch (chunk_status) {
-               case LTTNG_TRACE_CHUNK_STATUS_OK:
-                       break;
-               case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
-                       ret_code = LTTNG_ERR_INVALID;
-                       goto error;
-               default:
-                       ret_code = LTTNG_ERR_NOMEM;
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        goto error;
                }
        }
@@ -674,49 +626,101 @@ enum lttng_error_code session_switch_trace_chunk(struct ltt_session *session,
                 * No need to set crendentials and output directory
                 * for remote trace chunks.
                 */
-               goto publish;
+               goto end;
        }
 
        chunk_status = lttng_trace_chunk_set_credentials(trace_chunk,
                        &session_credentials);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ret_code = LTTNG_ERR_FATAL;
                goto error;
        }
 
-       if (!session->current_trace_chunk) {
-               DBG("Creating base output directory of session \"%s\" at %s",
-                               session->name, base_path);
-       }
+       DBG("Creating base output directory of session \"%s\" at %s",
+                       session->name, base_path);
        ret = utils_mkdir_recursive(base_path, S_IRWXU | S_IRWXG,
                        session->uid, session->gid);
        if (ret) {
-               ret = LTTNG_ERR_FATAL;
                goto error;
        }
        ret = lttng_directory_handle_init(&session_output_directory,
                        base_path);
        if (ret) {
-               ret = LTTNG_ERR_FATAL;
                goto error;
        }
        chunk_status = lttng_trace_chunk_set_as_owner(trace_chunk,
                        &session_output_directory);
        lttng_directory_handle_fini(&session_output_directory);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ret = LTTNG_ERR_CREATE_DIR_FAIL;
-               goto error;
-       }
-publish:
-       ret = session_set_trace_chunk(session, trace_chunk);
-       if (ret) {
-               ret_code = LTTNG_ERR_FATAL;
                goto error;
        }
+end:
+       return trace_chunk;
 error:
        lttng_trace_chunk_put(trace_chunk);
+       trace_chunk = NULL;
+       goto end;
+}
+
+int session_close_trace_chunk(const struct ltt_session *session,
+               struct lttng_trace_chunk *trace_chunk)
+{
+       int ret = 0;
+       bool error_occurred = false;
+       struct cds_lfht_iter iter;
+       struct consumer_socket *socket;
+       enum lttng_trace_chunk_status chunk_status;
+       const time_t chunk_close_timestamp = time(NULL);
+
+       if (chunk_close_timestamp == (time_t) -1) {
+               ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"",
+                               session->name);
+               ret = -1;
+               goto end;
+       }
+       chunk_status = lttng_trace_chunk_set_close_timestamp(trace_chunk,
+                       chunk_close_timestamp);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to set the close timestamp of the current trace chunk of session \"%s\"",
+                               session->name);
+               ret = -1;
+               goto end;
+       }
+
+       if (session->ust_session) {
+               cds_lfht_for_each_entry(
+                               session->ust_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_close_trace_chunk(socket,
+                                       session->consumer->net_seq_index,
+                                       session->id,
+                                       trace_chunk);
+                       pthread_mutex_unlock(socket->lock);
+                       if (ret) {
+                               ERR("Failed to close trace chunk on user space consumer");
+                               error_occurred = true;
+                       }
+               }
+       }
+       if (session->kernel_session) {
+               cds_lfht_for_each_entry(
+                               session->kernel_session->consumer->socks->ht,
+                               &iter, socket, node.node) {
+                       pthread_mutex_lock(socket->lock);
+                       ret = consumer_close_trace_chunk(socket,
+                                       session->consumer->net_seq_index,
+                                       session->id,
+                                       trace_chunk);
+                       pthread_mutex_unlock(socket->lock);
+                       if (ret) {
+                               ERR("Failed to close trace chunk on kernel consumer");
+                               error_occurred = true;
+                       }
+               }
+       }
+       ret = error_occurred ? -1 : 0;
 end:
-       return ret_code;
+       return ret;
 }
 
 /*
@@ -725,10 +729,12 @@ end:
  * Must be called with the session lock held.
  */
 int session_set_trace_chunk(struct ltt_session *session,
-               struct lttng_trace_chunk *new_trace_chunk)
+               struct lttng_trace_chunk *new_trace_chunk,
+               struct lttng_trace_chunk **current_trace_chunk)
 {
        ASSERT_LOCKED(session->lock);
-       return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk);
+       return _session_set_trace_chunk_no_lock_check(session, new_trace_chunk,
+                       current_trace_chunk);
 }
 
 static
@@ -739,11 +745,24 @@ void session_release(struct urcu_ref *ref)
        struct ltt_kernel_session *ksess;
        struct ltt_session *session = container_of(ref, typeof(*session), ref);
 
+       assert(!session->chunk_being_archived);
+
        usess = session->ust_session;
        ksess = session->kernel_session;
-       (void) _session_set_trace_chunk_no_lock_check(session, NULL);
+       if (session->current_trace_chunk) {
+               ret = session_close_trace_chunk(session, session->current_trace_chunk);
+               if (ret) {
+                       ERR("Failed to close the current trace chunk of session \"%s\" during its release",
+                                       session->name);
+               }
+               ret = _session_set_trace_chunk_no_lock_check(session, NULL, NULL);
+               if (ret) {
+                       ERR("Failed to release the current trace chunk of session \"%s\" during its release",
+                                       session->name);
+               }
+        }
 
-       /* Clean kernel session teardown */
+        /* Clean kernel session teardown */
        kernel_destroy_session(ksess);
        session->kernel_session = NULL;
 
@@ -785,6 +804,7 @@ void session_release(struct urcu_ref *ref)
                del_session_ht(session);
                pthread_cond_broadcast(&ltt_session_list.removal_cond);
        }
+       free(session->last_archived_chunk_name);
        free(session);
 }
 
@@ -1097,12 +1117,22 @@ int session_reset_rotation_state(struct ltt_session *session,
        ASSERT_LOCKED(ltt_session_list.lock);
        ASSERT_LOCKED(session->lock);
 
-       session->rotation_pending_local = false;
-       session->rotation_pending_relay = false;
-       session->rotated_after_last_stop = false;
        session->rotation_state = result;
        if (session->rotation_pending_check_timer_enabled) {
                ret = timer_session_rotation_pending_check_stop(session);
        }
+       if (session->chunk_being_archived) {
+               uint64_t chunk_id;
+               enum lttng_trace_chunk_status chunk_status;
+
+               chunk_status = lttng_trace_chunk_get_id(
+                               session->chunk_being_archived,
+                               &chunk_id);
+               assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+               LTTNG_OPTIONAL_SET(&session->last_archived_chunk_id,
+                               chunk_id);
+               lttng_trace_chunk_put(session->chunk_being_archived);
+               session->chunk_being_archived = NULL;
+       }
        return ret;
 }
This page took 0.031834 seconds and 5 git commands to generate.