relayd: add remote trace chunk close command
[lttng-tools.git] / src / common / consumer / consumer.c
index dea7b7f51287022ea360ba1d68d24fbd9f541d9e..3896875f50e999076b2296829e19eab5f665ad5c 100644 (file)
@@ -4497,7 +4497,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                        DBG("Failed to set new trace chunk on existing channels, rolling back");
                        close_ret = lttng_consumer_close_trace_chunk(relayd_id,
                                        session_id, chunk_id,
-                                       chunk_creation_timestamp);
+                                       chunk_creation_timestamp, NULL);
                        if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
                                ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
                                                session_id, chunk_id);
@@ -4527,7 +4527,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                        close_ret = lttng_consumer_close_trace_chunk(relayd_id,
                                        session_id,
                                        chunk_id,
-                                       chunk_creation_timestamp);
+                                       chunk_creation_timestamp,
+                                       NULL);
                        if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
                                ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
                                                session_id,
@@ -4548,12 +4549,14 @@ end:
 
 enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                const uint64_t *relayd_id, uint64_t session_id,
-               uint64_t chunk_id, time_t chunk_close_timestamp)
+               uint64_t chunk_id, time_t chunk_close_timestamp,
+               const enum lttng_trace_chunk_command_type *close_command)
 {
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
+       const char *close_command_name = "none";
        struct lttng_ht_iter iter;
        struct lttng_consumer_channel *channel;
        enum lttng_trace_chunk_status chunk_status;
@@ -4569,16 +4572,21 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
+       if (close_command) {
+               close_command_name = lttng_trace_chunk_command_type_get_name(
+                               *close_command);
+       }
 
        DBG("Consumer close trace chunk command: relayd_id = %s"
-                       ", session_id = %" PRIu64
-                       ", chunk_id = %" PRIu64, relayd_id_str,
-                       session_id, chunk_id);
+                       ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+                       ", close command = %s",
+                       relayd_id_str, session_id, chunk_id,
+                       close_command_name);
+
        chunk = lttng_trace_chunk_registry_find_chunk(
-                       consumer_data.chunk_registry, session_id,
-                       chunk_id);
-        if (!chunk) {
+                       consumer_data.chunk_registry, session_id, chunk_id);
+       if (!chunk) {
                ERR("Failed to find chunk: session_id = %" PRIu64
                                ", chunk_id = %" PRIu64,
                                session_id, chunk_id);
@@ -4592,12 +4600,15 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                goto end;
        }
-       /*
-        * Release the reference returned by the "find" operation and
-        * the session daemon's implicit reference to the chunk.
-        */
-       lttng_trace_chunk_put(chunk);
-       lttng_trace_chunk_put(chunk);
+
+       if (close_command) {
+               chunk_status = lttng_trace_chunk_set_close_command(
+                               chunk, *close_command);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       goto end;
+               }
+       }
 
        /*
         * chunk is now invalid to access as we no longer hold a reference to
@@ -4628,8 +4639,37 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                        ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                }
        }
+
+       if (relayd_id) {
+               int ret;
+               struct consumer_relayd_sock_pair *relayd;
+
+               relayd = consumer_find_relayd(*relayd_id);
+               if (relayd) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       ret = relayd_close_trace_chunk(
+                                       &relayd->control_sock, chunk);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               } else {
+                       ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+                                       *relayd_id);
+               }
+
+               if (!relayd || ret) {
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+                       goto error_unlock;
+               }
+       }
+error_unlock:
        rcu_read_unlock();
 end:
+       /*
+        * Release the reference returned by the "find" operation and
+        * the session daemon's implicit reference to the chunk.
+        */
+       lttng_trace_chunk_put(chunk);
+       lttng_trace_chunk_put(chunk);
+
        return ret_code;
 }
 
This page took 0.024711 seconds and 5 git commands to generate.