relayd: add remote trace chunk close command
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 18 Jul 2019 19:32:57 +0000 (15:32 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 9 Aug 2019 15:28:43 +0000 (11:28 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
15 files changed:
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/session.c
src/bin/lttng-sessiond/session.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/trace-chunk.c
src/common/trace-chunk.h
src/common/ust-consumer/ust-consumer.c

index 11277f29346e99104cfbedd6453ace99a76493f6..b5ae34ebf9c9989bbd9b72417cdcd427ad9d9546 100644 (file)
@@ -2853,6 +2853,107 @@ end_no_reply:
        return ret;
 }
 
+/*
+ * relay_close_trace_chunk: close a trace chunk
+ */
+static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn,
+               const struct lttng_buffer_view *payload)
+{
+       int ret = 0;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_close_trace_chunk *msg;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_trace_chunk *chunk = NULL;
+       enum lttng_error_code reply_code = LTTNG_OK;
+       enum lttng_trace_chunk_status chunk_status;
+       uint64_t chunk_id;
+       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+       time_t close_timestamp;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to close a trace chunk before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Chunk close command is unsupported before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk close command");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       chunk_id = be64toh(msg->chunk_id);
+       close_timestamp = (time_t) be64toh(msg->close_timestamp);
+       close_command = (typeof(close_command)){
+               .value = be32toh(msg->close_command.value),
+               .is_set = msg->close_command.is_set,
+       };
+
+       chunk = sessiond_trace_chunk_registry_get_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk_id);
+       if (!chunk) {
+               char uuid_str[UUID_STR_LEN];
+
+               lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+               ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+                               uuid_str,
+                               conn->session->id,
+                               msg->chunk_id);
+               ret = -1;
+               reply_code = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       chunk_status = lttng_trace_chunk_set_close_timestamp(
+                       chunk, close_timestamp);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to set trace chunk close timestamp");
+               ret = -1;
+               reply_code = LTTNG_ERR_UNK;
+               goto end;
+       }
+
+       if (close_command.is_set) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               chunk, close_command.value);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       reply_code = LTTNG_ERR_INVALID;
+                       goto end;
+               }
+       }
+
+end:
+       reply.ret_code = htobe32((uint32_t) reply_code);
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply),
+                       0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+end_no_reply:
+       lttng_trace_chunk_put(chunk);
+       return ret;
+}
+
 #define DBG_CMD(cmd_name, conn) \
                DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
 
@@ -2923,6 +3024,10 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
                ret = relay_create_trace_chunk(header, conn, payload);
                break;
+       case RELAYD_CLOSE_TRACE_CHUNK:
+               DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+               ret = relay_close_trace_chunk(header, conn, payload);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", header->cmd);
index d26d8de0bf661b7714914645558edcb2922e2f1d..6e5128917b474cd4d970669ba6ec25d06ef4dba8 100644 (file)
@@ -4504,7 +4504,8 @@ enum lttng_error_code snapshot_record(struct ltt_session *session,
                }
        }
 
-       if (session_close_trace_chunk(session, session->current_trace_chunk)) {
+       if (session_close_trace_chunk(
+                           session, session->current_trace_chunk, NULL)) {
                /*
                 * Don't goto end; make sure the chunk is closed for the session
                 * to allow future snapshots.
@@ -4750,14 +4751,6 @@ int cmd_rotate_session(struct ltt_session *session,
                        &ongoing_rotation_chunk_id);
        assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
-       chunk_status = lttng_trace_chunk_set_close_command(
-                       chunk_being_archived,
-                       LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               cmd_ret = LTTNG_ERR_FATAL;
-               goto error;
-       }
-
        if (session->kernel_session) {
                cmd_ret = kernel_rotate_session(session);
                if (cmd_ret != LTTNG_OK) {
@@ -4771,7 +4764,9 @@ int cmd_rotate_session(struct ltt_session *session,
                }
        }
 
-       ret = session_close_trace_chunk(session, chunk_being_archived);
+       ret = session_close_trace_chunk(session, chunk_being_archived,
+                       &((enum lttng_trace_chunk_command_type) {
+                                       LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED}));
        if (ret) {
                cmd_ret = LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
                goto error;
index 1bbee02f28c25ba64625cce77071ee8b8cfa0f2b..155f2b053aca8dfdfc3653c4c9bfdd7eb8f71ada 100644 (file)
@@ -1845,17 +1845,34 @@ int consumer_close_trace_chunk(struct consumer_socket *socket,
        int ret;
        enum lttng_trace_chunk_status chunk_status;
        struct lttcomm_consumer_msg msg = {
-               .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
-               .u.close_trace_chunk.session_id = session_id,
+                       .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+                       .u.close_trace_chunk.session_id = session_id,
        };
        uint64_t chunk_id;
        time_t close_timestamp;
+       enum lttng_trace_chunk_command_type close_command;
+       const char *close_command_name = "none";
 
        assert(socket);
 
        if (relayd_id != -1ULL) {
-               LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id,
-                               relayd_id);
+               LTTNG_OPTIONAL_SET(
+                               &msg.u.close_trace_chunk.relayd_id, relayd_id);
+       }
+
+       chunk_status = lttng_trace_chunk_get_close_command(
+                       chunk, &close_command);
+       switch (chunk_status) {
+       case LTTNG_TRACE_CHUNK_STATUS_OK:
+               LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command,
+                               (uint32_t) close_command);
+               break;
+       case LTTNG_TRACE_CHUNK_STATUS_NONE:
+               break;
+       default:
+               ERR("Failed to get trace chunk close command");
+               ret = -1;
+               goto error;
        }
 
        chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
@@ -1877,10 +1894,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket,
        assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
        msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp;
 
+       if (msg.u.close_trace_chunk.close_command.is_set) {
+               close_command_name = lttng_trace_chunk_command_type_get_name(
+                               close_command);
+       }
        DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64
-                       ", session_id = %" PRIu64
-                       ", chunk_id = %" PRIu64,
-                       relayd_id, session_id, chunk_id);
+                       ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+                       ", close command = \"%s\"",
+                       relayd_id, session_id, chunk_id, close_command_name);
 
        health_code_update();
        ret = consumer_send_msg(socket, &msg);
index 61cbb0c1af2f3e10abed78c1394de238706597dd..4b5c22bfb8626ef8f52b48ebbb75a77d9b4b05e8 100644 (file)
@@ -677,7 +677,8 @@ error:
 }
 
 int session_close_trace_chunk(const struct ltt_session *session,
-               struct lttng_trace_chunk *trace_chunk)
+               struct lttng_trace_chunk *trace_chunk,
+               const enum lttng_trace_chunk_command_type *close_command)
 {
        int ret = 0;
        bool error_occurred = false;
@@ -686,6 +687,15 @@ int session_close_trace_chunk(const struct ltt_session *session,
        enum lttng_trace_chunk_status chunk_status;
        const time_t chunk_close_timestamp = time(NULL);
 
+       if (close_command) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               trace_chunk, *close_command);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+
        if (chunk_close_timestamp == (time_t) -1) {
                ERR("Failed to sample the close timestamp of the current trace chunk of session \"%s\"",
                                session->name);
@@ -784,7 +794,7 @@ void session_release(struct urcu_ref *ref)
        session_notify_destruction(session);
        lttng_dynamic_array_reset(&session->destroy_notifiers);
        if (session->current_trace_chunk) {
-               ret = session_close_trace_chunk(session, session->current_trace_chunk);
+               ret = session_close_trace_chunk(session, session->current_trace_chunk, NULL);
                if (ret) {
                        ERR("Failed to close the current trace chunk of session \"%s\" during its release",
                                        session->name);
index 0b4746cb51b80c5fe62d28997eb0944e77bc24e2..0750e1bcd83427cdd7cce518a2935d80759b0ab2 100644 (file)
@@ -245,6 +245,7 @@ int session_set_trace_chunk(struct ltt_session *session,
  * ltt_session itself.
  */
 int session_close_trace_chunk(const struct ltt_session *session,
-               struct lttng_trace_chunk *trace_chunk);
+               struct lttng_trace_chunk *trace_chunk,
+               const enum lttng_trace_chunk_command_type *close_command);
 
 #endif /* _LTT_SESSION_H */
index dea7b7f51287022ea360ba1d68d24fbd9f541d9e..3896875f50e999076b2296829e19eab5f665ad5c 100644 (file)
@@ -4497,7 +4497,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                        DBG("Failed to set new trace chunk on existing channels, rolling back");
                        close_ret = lttng_consumer_close_trace_chunk(relayd_id,
                                        session_id, chunk_id,
-                                       chunk_creation_timestamp);
+                                       chunk_creation_timestamp, NULL);
                        if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
                                ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
                                                session_id, chunk_id);
@@ -4527,7 +4527,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                        close_ret = lttng_consumer_close_trace_chunk(relayd_id,
                                        session_id,
                                        chunk_id,
-                                       chunk_creation_timestamp);
+                                       chunk_creation_timestamp,
+                                       NULL);
                        if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
                                ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
                                                session_id,
@@ -4548,12 +4549,14 @@ end:
 
 enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                const uint64_t *relayd_id, uint64_t session_id,
-               uint64_t chunk_id, time_t chunk_close_timestamp)
+               uint64_t chunk_id, time_t chunk_close_timestamp,
+               const enum lttng_trace_chunk_command_type *close_command)
 {
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
+       const char *close_command_name = "none";
        struct lttng_ht_iter iter;
        struct lttng_consumer_channel *channel;
        enum lttng_trace_chunk_status chunk_status;
@@ -4569,16 +4572,21 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
+       if (close_command) {
+               close_command_name = lttng_trace_chunk_command_type_get_name(
+                               *close_command);
+       }
 
        DBG("Consumer close trace chunk command: relayd_id = %s"
-                       ", session_id = %" PRIu64
-                       ", chunk_id = %" PRIu64, relayd_id_str,
-                       session_id, chunk_id);
+                       ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+                       ", close command = %s",
+                       relayd_id_str, session_id, chunk_id,
+                       close_command_name);
+
        chunk = lttng_trace_chunk_registry_find_chunk(
-                       consumer_data.chunk_registry, session_id,
-                       chunk_id);
-        if (!chunk) {
+                       consumer_data.chunk_registry, session_id, chunk_id);
+       if (!chunk) {
                ERR("Failed to find chunk: session_id = %" PRIu64
                                ", chunk_id = %" PRIu64,
                                session_id, chunk_id);
@@ -4592,12 +4600,15 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                goto end;
        }
-       /*
-        * Release the reference returned by the "find" operation and
-        * the session daemon's implicit reference to the chunk.
-        */
-       lttng_trace_chunk_put(chunk);
-       lttng_trace_chunk_put(chunk);
+
+       if (close_command) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               chunk, *close_command);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       goto end;
+               }
+       }
 
        /*
         * chunk is now invalid to access as we no longer hold a reference to
@@ -4628,8 +4639,37 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                        ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                }
        }
+
+       if (relayd_id) {
+               int ret;
+               struct consumer_relayd_sock_pair *relayd;
+
+               relayd = consumer_find_relayd(*relayd_id);
+               if (relayd) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       ret = relayd_close_trace_chunk(
+                                       &relayd->control_sock, chunk);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               } else {
+                       ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+                                       *relayd_id);
+               }
+
+               if (!relayd || ret) {
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       goto error_unlock;
+               }
+       }
+error_unlock:
        rcu_read_unlock();
 end:
+       /*
+        * Release the reference returned by the "find" operation and
+        * the session daemon's implicit reference to the chunk.
+        */
+       lttng_trace_chunk_put(chunk);
+       lttng_trace_chunk_put(chunk);
+
        return ret_code;
 }
 
index f514aba712a24c13e3191b1a4de910fb9a0e35f0..a0b81c6fae82634393fc9f2d6e8d2205d2dc2de8 100644 (file)
@@ -844,7 +844,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                struct lttng_directory_handle *chunk_directory_handle);
 enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                const uint64_t *relayd_id, uint64_t session_id,
-               uint64_t chunk_id, time_t chunk_close_timestamp);
+               uint64_t chunk_id, time_t chunk_close_timestamp,
+               const enum lttng_trace_chunk_command_type *close_command);
 enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id);
index edf8acb8286cedc3c8649b6c64313ccb0ff385fc..29ccd0f091d7c7c55ed95b0465d489d54ea2e0fc 100644 (file)
@@ -1239,15 +1239,21 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
        {
+               enum lttng_trace_chunk_command_type close_command =
+                               msg.u.close_trace_chunk.close_command.value;
                const uint64_t relayd_id =
                                msg.u.close_trace_chunk.relayd_id.value;
 
                ret_code = lttng_consumer_close_trace_chunk(
                                msg.u.close_trace_chunk.relayd_id.is_set ?
-                                               &relayd_id : NULL,
+                                               &relayd_id :
+                                               NULL,
                                msg.u.close_trace_chunk.session_id,
                                msg.u.close_trace_chunk.chunk_id,
-                               (time_t) msg.u.close_trace_chunk.close_timestamp);
+                               (time_t) msg.u.close_trace_chunk.close_timestamp,
+                               msg.u.close_trace_chunk.close_command.is_set ?
+                                               &close_command :
+                                               NULL);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
index b065f364eb67f3e06410b9c3043082abdd8f85ad..830c7c263f89935f0ecf00cc91ca5d17cc9fa320 100644 (file)
@@ -1252,11 +1252,8 @@ int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
                }
        }
 
-       ret = send_command(sock,
-                       RELAYD_CREATE_TRACE_CHUNK,
-                       payload.data,
-                       payload.size,
-                       0);
+       ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
+                       payload.size, 0);
        if (ret < 0) {
                ERR("Failed to send trace chunk creation command to relay daemon");
                goto end;
@@ -1283,3 +1280,78 @@ end:
        lttng_dynamic_buffer_reset(&payload);
        return ret;
 }
+
+int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
+               struct lttng_trace_chunk *chunk)
+{
+       int ret = 0;
+       enum lttng_trace_chunk_status status;
+       struct lttcomm_relayd_close_trace_chunk msg = {};
+       struct lttcomm_relayd_generic_reply reply = {};
+       uint64_t chunk_id;
+       time_t close_timestamp;
+       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
+
+       status = lttng_trace_chunk_get_id(chunk, &chunk_id);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to get trace chunk id");
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to get trace chunk close timestamp");
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_trace_chunk_get_close_command(chunk,
+                       &close_command.value);
+       switch (status) {
+       case LTTNG_TRACE_CHUNK_STATUS_OK:
+               close_command.is_set = 1;
+               break;
+       case LTTNG_TRACE_CHUNK_STATUS_NONE:
+               break;
+       default:
+               ERR("Failed to get trace chunk close command");
+               ret = -1;
+               goto end;
+       }
+
+       msg = (typeof(msg)){
+               .chunk_id = htobe64(chunk_id),
+               .close_timestamp = htobe64((uint64_t) close_timestamp),
+               .close_command = {
+                       .value = htobe32((uint32_t) close_command.value),
+                       .is_set = close_command.is_set,
+               },
+       };
+
+       ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
+                       0);
+       if (ret < 0) {
+               ERR("Failed to send trace chunk close command to relay daemon");
+               goto end;
+       }
+
+       ret = recv_reply(sock, &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("Failed to receive relay daemon trace chunk close command reply");
+               goto end;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd trace chunk close replied error %d",
+                               reply.ret_code);
+       } else {
+               ret = 0;
+               DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
+                               chunk_id);
+       }
+end:
+       return ret;
+}
index 695ef6ffa9cf723997bc2012a16c2bfdf8c03dad..eb7782de050761618c07d8aeee3198209f79b879 100644 (file)
@@ -61,5 +61,7 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
                uint64_t new_chunk_id, uint64_t seq_num);
 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
+int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
+               struct lttng_trace_chunk *chunk);
 
 #endif /* _RELAYD_H */
index f86dde3d1243d70f26769e9efaa893fed1a89a66..1568f51137a6042b1a921767e09c9793ed35b9cc 100644 (file)
@@ -246,4 +246,12 @@ struct lttcomm_relayd_create_trace_chunk {
        char override_name[];
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_close_trace_chunk {
+       uint64_t chunk_id;
+       /* Seconds since EPOCH. */
+       uint64_t close_timestamp;
+       /* enum lttng_trace_chunk_command_type */
+       LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
index d5009740e45da8ad96263226d4db80929f0fed25..aeefdaaa6b95b076a804c54299bfbdf1a8425276 100644 (file)
@@ -135,6 +135,8 @@ enum lttcomm_relayd_command {
        RELAYD_ROTATE_STREAM                = 18,
        /* Ask the relay to create a trace chunk (2.11+) */
        RELAYD_CREATE_TRACE_CHUNK           = 19,
+       /* Ask the relay to close a trace chunk (2.11+) */
+       RELAYD_CLOSE_TRACE_CHUNK            = 20,
 };
 
 /*
@@ -645,6 +647,8 @@ struct lttcomm_consumer_msg {
                        uint64_t session_id;
                        uint64_t chunk_id;
                        uint64_t close_timestamp;
+                       /* enum lttng_trace_chunk_command_type */
+                       LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
                } LTTNG_PACKED close_trace_chunk;
                struct {
                        LTTNG_OPTIONAL_COMM(uint64_t) LTTNG_PACKED relayd_id;
index 488d7ebc4ffffcf19938cdb5f1da4036311417e6..3f472b4353bc44aa8de15be627878640f96fca6c 100644 (file)
@@ -829,7 +829,16 @@ void lttng_trace_chunk_move_to_completed(struct lttng_trace_chunk *trace_chunk)
                        LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close);
        LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory;
 
-       assert(trace_chunk->mode.is_set);
+       if (!trace_chunk->mode.is_set ||
+                       trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER ||
+                       !trace_chunk->session_output_directory.is_set) {
+               /*
+                * This command doesn't need to run if the output is remote
+                * or if the trace chunk is not owned by this process.
+                */
+               goto end;
+       }
+
        assert(trace_chunk->mode.value == TRACE_CHUNK_MODE_OWNER);
        assert(!trace_chunk->name_overriden);
 
@@ -949,6 +958,24 @@ end:
        }
 }
 
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command(
+               struct lttng_trace_chunk *chunk,
+               enum lttng_trace_chunk_command_type *command_type)
+{
+       enum lttng_trace_chunk_status status = LTTNG_TRACE_CHUNK_STATUS_OK;
+
+       pthread_mutex_lock(&chunk->lock);
+       if (chunk->close_command.is_set) {
+               *command_type = chunk->close_command.value;
+               status = LTTNG_TRACE_CHUNK_STATUS_OK;
+       } else {
+               status = LTTNG_TRACE_CHUNK_STATUS_NONE;
+       }
+       pthread_mutex_unlock(&chunk->lock);
+       return status;
+}
+
 LTTNG_HIDDEN
 enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command(
                struct lttng_trace_chunk *chunk,
@@ -977,6 +1004,18 @@ end_unlock:
        return status;
 }
 
+LTTNG_HIDDEN
+const char *lttng_trace_chunk_command_type_get_name(
+               enum lttng_trace_chunk_command_type command)
+{
+       switch (command) {
+       case LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED:
+               return "move to completed trace chunk folder";
+       default:
+               abort();
+       }
+}
+
 LTTNG_HIDDEN
 bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk)
 {
index a318341b525c3e4ef849fe77e4f15558802f5524..474e9e000d2caf671eb3578fac1ca6edc3ac3041 100644 (file)
@@ -153,11 +153,20 @@ LTTNG_HIDDEN
 int lttng_trace_chunk_unlink_file(struct lttng_trace_chunk *chunk,
                const char *filename);
 
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_get_close_command(
+               struct lttng_trace_chunk *chunk,
+               enum lttng_trace_chunk_command_type *command_type);
+
 LTTNG_HIDDEN
 enum lttng_trace_chunk_status lttng_trace_chunk_set_close_command(
                struct lttng_trace_chunk *chunk,
                enum lttng_trace_chunk_command_type command_type);
 
+LTTNG_HIDDEN
+const char *lttng_trace_chunk_command_type_get_name(
+               enum lttng_trace_chunk_command_type command);
+
 /* Returns true on success. */
 LTTNG_HIDDEN
 bool lttng_trace_chunk_get(struct lttng_trace_chunk *chunk);
index 0856cf0b9d3abf0858ca693b664ab5e542e5bbfb..0d61b866f46d426c120694d9138f57a83be26a5d 100644 (file)
@@ -2080,15 +2080,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
        {
+               enum lttng_trace_chunk_command_type close_command =
+                               msg.u.close_trace_chunk.close_command.value;
                const uint64_t relayd_id =
                                msg.u.close_trace_chunk.relayd_id.value;
 
                ret_code = lttng_consumer_close_trace_chunk(
                                msg.u.close_trace_chunk.relayd_id.is_set ?
-                                               &relayd_id : NULL,
+                                               &relayd_id :
+                                               NULL,
                                msg.u.close_trace_chunk.session_id,
                                msg.u.close_trace_chunk.chunk_id,
-                               (time_t) msg.u.close_trace_chunk.close_timestamp);
+                               (time_t) msg.u.close_trace_chunk.close_timestamp,
+                               msg.u.close_trace_chunk.close_command.is_set ?
+                                               &close_command :
+                                               NULL);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
This page took 0.03854 seconds and 5 git commands to generate.