From: Julien Desfossez Date: Mon, 18 Dec 2017 21:04:44 +0000 (-0500) Subject: Relay rotate pending command X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=d88744a44aa5f2ca90ab87946692b9eed3120641 Relay rotate pending command When a session rotation completes and the session is configured to send its traces to a relay, we have to poll the relay to know when all the chunk's data are written on its disk. To do that, we define a timer in the sessiond and arm it when the rotation is complete. When the rotation is complete on the relay, we clear the "rotate_pending" flag in the session and the client can access the chunk safely. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- diff --git a/configure.ac b/configure.ac index 972a73e8b..71ec27803 100644 --- a/configure.ac +++ b/configure.ac @@ -365,6 +365,7 @@ _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_AGENT_BIND_ADDRESS], [localhost]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0]) _AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_VIEWER_BIND_ADDRESS], [localhost]) +_AC_DEFINE_AND_SUBST([DEFAULT_ROTATE_PENDING_RELAY_TIMER], [200000]) # Command short descriptions _AC_DEFINE_QUOTED_AND_SUBST([CMD_DESCR_ADD_CONTEXT], [Add context fields to a channel]) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 45b7de8c6..f68ecdfbc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2803,7 +2803,7 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, { struct relay_session *session = conn->session; struct lttcomm_relayd_rotate_pending msg; - struct lttcomm_relayd_generic_reply reply; + struct lttcomm_relayd_rotate_pending_reply reply; struct lttng_ht_iter iter; struct relay_stream *stream; int ret = 0; @@ -2883,7 +2883,8 @@ int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, send_reply: rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); - reply.ret_code = htobe32(rotate_pending ? 1 : 0); + reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK); + reply.is_pending = (uint8_t) !!rotate_pending; network_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); if (network_ret < (ssize_t) sizeof(reply)) { diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 606811e18..a2af4c38a 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -2651,6 +2651,10 @@ int cmd_stop_trace(struct ltt_session *session) goto error; } + if (session->rotate_relay_pending_timer_enabled) { + sessiond_timer_rotate_pending_stop(session); + } + if (session->rotate_count > 0 && !session->rotate_pending) { ret = rename_active_chunk(session); if (ret) { @@ -2949,6 +2953,10 @@ int cmd_destroy_session(struct ltt_session *session, int wpipe) DBG("Begin destroy session %s (id %" PRIu64 ")", session->name, session->id); + if (session->rotate_relay_pending_timer_enabled) { + sessiond_timer_rotate_pending_stop(session); + } + /* * The rename of the current chunk is performed at stop, but if we rotated * the session after the previous stop command, we need to rename the diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index a226b5726..f89bb1df2 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1735,6 +1735,51 @@ error: return ret; } +/* + * Ask the relay if a rotation is still pending. Must be called with the socket + * lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_rotate_pending_relay(struct consumer_socket *socket, + struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + uint32_t pending = 0; + + assert(socket); + + DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id % " PRIu64, + session_id, chunk_id); + assert(output->type == CONSUMER_DST_NET); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY; + msg.u.rotate_pending_relay.session_id = session_id; + msg.u.rotate_pending_relay.relayd_id = output->net_seq_index; + msg.u.rotate_pending_relay.chunk_id = chunk_id; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } + + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (ret < 0) { + goto error; + } + + ret = pending; + +error: + health_code_update(); + return ret; +} + /* * Ask the consumer to create a directory. * diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index fbd5b1152..d875f2c6d 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -329,6 +329,9 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, const struct consumer_output *output, const char *old_path, const char *new_path, uid_t uid, gid_t gid); +int consumer_rotate_pending_relay(struct consumer_socket *socket, + struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id); int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, const struct consumer_output *output, const char *path, uid_t uid, gid_t gid); diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c index 49ccea94e..3ca80ce6a 100644 --- a/src/bin/lttng-sessiond/rotate.c +++ b/src/bin/lttng-sessiond/rotate.c @@ -32,6 +32,8 @@ #include #include +#include + #include "session.h" #include "rotate.h" #include "rotation-thread.h" @@ -350,3 +352,50 @@ error: end: return ret; } + +int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id) +{ + int ret; + struct consumer_socket *socket; + struct consumer_output *output; + struct lttng_ht_iter iter; + + /* + * Either one of the sessions is enough to find the consumer_output + * and uid/gid. + */ + if (session->kernel_session) { + output = session->kernel_session->consumer; + } else if (session->ust_session) { + output = session->ust_session->consumer; + } else { + assert(0); + } + + if (!output || !output->socks) { + ERR("No consumer output found"); + ret = -1; + goto end; + } + + ret = -1; + + rcu_read_lock(); + /* + * We have to iterate to find a socket, but we only need to send the + * rotate pending command to one consumer, so we break after the first + * one. + */ + cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, + node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_rotate_pending_relay(socket, output, session->id, + chunk_id); + pthread_mutex_unlock(socket->lock); + break; + } + rcu_read_unlock(); + +end: + return ret; +} diff --git a/src/bin/lttng-sessiond/rotate.h b/src/bin/lttng-sessiond/rotate.h index e2c0829bc..ea2383f2c 100644 --- a/src/bin/lttng-sessiond/rotate.h +++ b/src/bin/lttng-sessiond/rotate.h @@ -48,6 +48,8 @@ unsigned long hash_channel_key(struct rotation_channel_key *key); /* session lock must be held by this function's caller. */ int rename_complete_chunk(struct ltt_session *session, time_t ts); +int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id); + /* * When we start the rotation of a channel, we add its information in * channel_pending_rotate_ht. This is called in the context of diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c index a1471e3c4..21f20b403 100644 --- a/src/bin/lttng-sessiond/rotation-thread.c +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -369,6 +369,16 @@ int handle_channel_rotation_pipe(int fd, uint32_t revents, session->rotate_pending = false; session->rotation_status = LTTNG_ROTATION_STATUS_COMPLETED; session->last_chunk_start_ts = session->current_chunk_start_ts; + if (session->rotate_pending_relay) { + ret = sessiond_timer_rotate_pending_start( + session, + DEFAULT_ROTATE_PENDING_RELAY_TIMER); + if (ret) { + ERR("Failed to enable rotate pending timer"); + ret = -1; + goto end_unlock_session; + } + } DBG("Rotation completed for session %s", session->name); } @@ -384,6 +394,134 @@ end: return ret; } +/* + * Process the rotate_pending check, called with session lock held. + */ +static +int rotate_pending_relay_timer(struct ltt_session *session) +{ + int ret; + + DBG("[rotation-thread] Check rotate pending on session %" PRIu64, + session->id); + ret = relay_rotate_pending(session, session->rotate_count - 1); + if (ret < 0) { + ERR("[rotation-thread] Check relay rotate pending"); + goto end; + } + if (ret == 0) { + DBG("[rotation-thread] Rotation completed on the relay for " + "session %" PRIu64, session->id); + /* + * Now we can clear the pending flag in the session. New + * rotations can start now. + */ + session->rotate_pending_relay = false; + } else if (ret == 1) { + DBG("[rotation-thread] Rotation still pending on the relay for " + "session %" PRIu64, session->id); + ret = sessiond_timer_rotate_pending_start(session, + DEFAULT_ROTATE_PENDING_RELAY_TIMER); + if (ret) { + ERR("Re-enabling rotate pending timer"); + ret = -1; + goto end; + } + } + + ret = 0; + +end: + return ret; +} + +static +int handle_rotate_timer_pipe(uint32_t revents, + struct rotation_thread_handle *handle, + struct rotation_thread_state *state, + struct rotation_thread_timer_queue *queue) +{ + int ret = 0; + int fd = lttng_pipe_get_readfd(queue->event_pipe); + struct ltt_session *session; + char buf[1]; + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ret = lttng_poll_del(&state->events, fd); + if (ret) { + ERR("[rotation-thread] Failed to remove consumer " + "rotate pending pipe from poll set"); + } + goto end; + } + + ret = lttng_read(fd, buf, 1); + if (ret != 1) { + ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd); + ret = -1; + goto end; + } + + for (;;) { + struct sessiond_rotation_timer *timer_data; + + /* + * Take the queue lock only to pop elements from the list. + */ + pthread_mutex_lock(&queue->lock); + if (cds_list_empty(&queue->list)) { + pthread_mutex_unlock(&queue->lock); + break; + } + timer_data = cds_list_first_entry(&queue->list, + struct sessiond_rotation_timer, head); + cds_list_del(&timer_data->head); + pthread_mutex_unlock(&queue->lock); + + /* + * session lock to lookup the session ID. + */ + session_lock_list(); + session = session_find_by_id(timer_data->session_id); + if (!session) { + DBG("[rotation-thread] Session %" PRIu64 " not found", + timer_data->session_id); + /* + * This is a non-fatal error, and we cannot report it to the + * user (timer), so just print the error and continue the + * processing. + */ + session_unlock_list(); + free(timer_data); + continue; + } + + /* + * Take the session lock and release the session_list lock. + */ + session_lock(session); + session_unlock_list(); + + if (timer_data->signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) { + ret = rotate_pending_relay_timer(session); + } else { + ERR("Unknown signal in rotate timer %d", timer_data->signal); + ret = -1; + } + session_unlock(session); + free(timer_data); + if (ret) { + ERR("Error processing timer"); + goto end; + } + } + + ret = 0; + +end: + return ret; +} + void *thread_rotation(void *data) { int ret; @@ -441,6 +579,13 @@ void *thread_rotation(void *data) if (fd == handle->thread_quit_pipe) { DBG("[rotation-thread] Quit pipe activity"); goto exit; + } else if (fd == lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe)) { + ret = handle_rotate_timer_pipe(revents, + handle, &state, handle->rotation_timer_queue); + if (ret) { + ERR("[rotation-thread] Failed to handle rotation timer pipe event"); + goto error; + } } else if (fd == handle->ust32_consumer || fd == handle->ust64_consumer || fd == handle->kernel_consumer) { diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 942d68c13..0f62ab49f 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -402,6 +402,7 @@ int session_create(char *name, uid_t uid, gid_t gid) new_session->rotate_pending = false; new_session->rotate_pending_relay = false; + new_session->rotate_relay_pending_timer_enabled = false; /* Add new session to the session list */ session_lock_list(); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 78890db3c..2e22885e8 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -167,6 +167,12 @@ struct ltt_session { * with the current timestamp. */ time_t current_chunk_start_ts; + /* + * Timer to check periodically if a relay has completed the last + * rotation. + */ + bool rotate_relay_pending_timer_enabled; + timer_t rotate_relay_pending_timer; /* * Keep a state if this session was rotated after the last stop command. * We only allow one rotation after a stop. At destroy, we also need to diff --git a/src/bin/lttng-sessiond/sessiond-timer.c b/src/bin/lttng-sessiond/sessiond-timer.c index d7aaca0f1..0d500aa55 100644 --- a/src/bin/lttng-sessiond/sessiond-timer.c +++ b/src/bin/lttng-sessiond/sessiond-timer.c @@ -51,6 +51,10 @@ void setmask(sigset_t *mask) if (ret) { PERROR("sigaddset exit"); } + ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING); + if (ret) { + PERROR("sigaddset switch"); + } } /* @@ -179,6 +183,51 @@ end: return ret; } +int sessiond_timer_rotate_pending_start(struct ltt_session *session, + unsigned int interval_us) +{ + int ret; + + DBG("Enabling rotate pending timer on session %" PRIu64, session->id); + /* + * We arm this timer in a one-shot mode so we don't have to disable it + * explicitly (which could deadlock if the timer thread is blocked writing + * in the rotation_timer_pipe). + * Instead, we re-arm it if needed after the rotation_pending check as + * returned. Also, this timer is usually only needed once, so there is no + * need to go through the whole signal teardown scheme everytime. + */ + ret = session_timer_start(&session->rotate_relay_pending_timer, + session, interval_us, + LTTNG_SESSIOND_SIG_ROTATE_PENDING, + /* one-shot */ true); + if (ret == 0) { + session->rotate_relay_pending_timer_enabled = true; + } + + return ret; +} + +/* + * Stop and delete the channel's live timer. + * Called with session and session_list locks held. + */ +void sessiond_timer_rotate_pending_stop(struct ltt_session *session) +{ + int ret; + + assert(session); + + DBG("Disabling timer rotate pending on session %" PRIu64, session->id); + ret = session_timer_stop(&session->rotate_relay_pending_timer, + LTTNG_SESSIOND_SIG_ROTATE_PENDING); + if (ret == -1) { + ERR("Failed to stop rotate_pending timer"); + } + + session->rotate_relay_pending_timer_enabled = false; +} + /* * Block the RT signals for the entire process. It must be called from the * sessiond main before creating the threads @@ -199,6 +248,116 @@ int sessiond_timer_signal_init(void) return 0; } +/* + * Called with the rotation_timer_queue lock held. + * Return true if the same timer job already exists in the queue, false if not. + */ +static +bool check_duplicate_timer_job(struct timer_thread_parameters *ctx, + struct ltt_session *session, unsigned int signal) +{ + bool ret = false; + struct sessiond_rotation_timer *node; + + rcu_read_lock(); + cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) { + if (node->session_id == session->id && node->signal == signal) { + ret = true; + goto end; + } + } + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Add the session ID and signal value to the rotation_timer_queue if it is + * not already there and wakeup the rotation thread. The rotation thread + * empties the whole queue everytime it is woken up. The event_pipe is + * non-blocking, if it would block, we just return because we know the + * rotation thread will be awaken anyway. + */ +static +int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx, + struct ltt_session *session, unsigned int signal) +{ + int ret; + bool has_duplicate_timer_job; + char *c = "!"; + + pthread_mutex_lock(&ctx->rotation_timer_queue->lock); + has_duplicate_timer_job = check_duplicate_timer_job(ctx, session, + signal); + + if (!has_duplicate_timer_job) { + struct sessiond_rotation_timer *timer_data = NULL; + + timer_data = zmalloc(sizeof(struct sessiond_rotation_timer)); + if (!timer_data) { + PERROR("Allocation of timer data"); + goto error; + } + timer_data->session_id = session->id; + timer_data->signal = signal; + cds_list_add_tail(&timer_data->head, + &ctx->rotation_timer_queue->list); + } else { + /* + * This timer job is already pending, we don't need to add + * it. + */ + pthread_mutex_unlock(&ctx->rotation_timer_queue->lock); + ret = 0; + goto end; + } + pthread_mutex_unlock(&ctx->rotation_timer_queue->lock); + + ret = lttng_write( + lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe), + c, 1); + if (ret < 0) { + /* + * We do not want to block in the timer handler, the job has been + * enqueued in the list, the wakeup pipe is probably full, the job + * will be processed when the rotation_thread catches up. + */ + if (errno == EAGAIN || errno == EWOULDBLOCK) { + ret = 0; + goto end; + } + PERROR("Timer wakeup rotation thread"); + goto error; + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; +} + +/* + * Ask the rotation thread to check if the last rotation started in this + * session is still pending on the relay. + */ +static +void relay_rotation_pending_timer(struct timer_thread_parameters *ctx, + int sig, siginfo_t *si) +{ + int ret; + struct ltt_session *session = si->si_value.sival_ptr; + assert(session); + + ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_PENDING); + if (ret) { + PERROR("wakeup rotate pipe"); + } +} + /* * This thread is the sighandler for the timer signals. */ @@ -244,6 +403,8 @@ void *sessiond_timer_thread(void *data) DBG("Signal timer metadata thread teardown"); } else if (signr == LTTNG_SESSIOND_SIG_EXIT) { goto end; + } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) { + relay_rotation_pending_timer(ctx, info.si_signo, &info); } else { ERR("Unexpected signal %d\n", info.si_signo); } diff --git a/src/bin/lttng-sessiond/sessiond-timer.h b/src/bin/lttng-sessiond/sessiond-timer.h index e1b8a7ead..5c84f3830 100644 --- a/src/bin/lttng-sessiond/sessiond-timer.h +++ b/src/bin/lttng-sessiond/sessiond-timer.h @@ -24,6 +24,7 @@ #define LTTNG_SESSIOND_SIG_TEARDOWN SIGRTMIN + 10 #define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11 +#define LTTNG_SESSIOND_SIG_ROTATE_PENDING SIGRTMIN + 12 #define CLOCKID CLOCK_MONOTONIC @@ -54,4 +55,8 @@ struct sessiond_rotation_timer { void *sessiond_timer_thread(void *data); int sessiond_timer_signal_init(void); +int sessiond_timer_rotate_pending_start(struct ltt_session *session, unsigned int + interval_us); +void sessiond_timer_rotate_pending_stop(struct ltt_session *session); + #endif /* SESSIOND_TIMER_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3ac00cb03..3638273d0 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4389,6 +4389,27 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; +} + static int mkdir_local(const char *path, uid_t uid, gid_t gid) { diff --git a/src/common/defaults.h b/src/common/defaults.h index 083e1abc9..38ab1e146 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -338,6 +338,12 @@ #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_PROBE_INTERVAL" #define DEFAULT_LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD_ENV "LTTNG_RELAYD_TCP_KEEP_ALIVE_ABORT_THRESHOLD" +/* + * Default timer value in usec for the rotate pending polling check on the + * relay when a rotation has completed on the consumer. + */ +#define DEFAULT_ROTATE_PENDING_RELAY_TIMER CONFIG_DEFAULT_ROTATE_PENDING_RELAY_TIMER + /* * Returns the default subbuf size. * diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 2eb1ebe5d..ec67e316d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1180,6 +1180,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + uint32_t pending_reply; + + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + ret = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (ret < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } else { + pending_reply = !!ret; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending_reply, + sizeof(pending_reply)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } + break; + } case LTTNG_CONSUMER_MKDIR: { DBG("Consumer mkdir %s in session %" PRIu64, diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 242abbedf..995e48801 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -1086,6 +1086,56 @@ error: return ret; } +int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id) +{ + int ret; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_rotate_pending_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Querying relayd for rotate pending with chunk_id %" PRIu64, + chunk_id); + + memset(&msg, 0, sizeof(msg)); + msg.chunk_id = htobe64(chunk_id); + + /* Send command */ + ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg, + sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Receive response */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.generic.ret_code = be32toh(reply.generic.ret_code); + + /* Return session id or negative ret code. */ + if (reply.generic.ret_code != LTTNG_OK) { + ret = -reply.generic.ret_code; + ERR("Relayd rotate pending replied with error %d", ret); + goto error; + } else { + /* No error, just rotate pending state */ + if (reply.is_pending == 0 || reply.is_pending == 1) { + ret = reply.is_pending; + DBG("Relayd rotate pending command completed successfully with result \"%s\"", + ret ? "rotation pending" : "rotation NOT pending"); + } else { + ret = -LTTNG_ERR_UNK; + } + } + +error: + return ret; +} + int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path) { int ret; diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index f6329eb5b..fb3e94224 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -55,6 +55,8 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num); int relayd_rotate_rename(struct lttcomm_relayd_sock *sock, const char *current_path, const char *new_path); +int relayd_rotate_pending(struct lttcomm_relayd_sock *sock, + uint64_t chunk_id); int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index e9a7e9ff2..3bfa3eb28 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -217,6 +217,12 @@ struct lttcomm_relayd_rotate_pending { uint64_t chunk_id; } LTTNG_PACKED; +struct lttcomm_relayd_rotate_pending_reply { + struct lttcomm_relayd_generic_reply generic; + /* Valid values are [0, 1]. */ + uint8_t is_pending; +} LTTNG_PACKED; + struct lttcomm_relayd_mkdir { /* Includes trailing NULL */ uint32_t length; diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index e931c6931..4eed95dec 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -101,6 +102,7 @@ enum lttcomm_sessiond_command { LTTNG_REGISTER_TRIGGER = 43, LTTNG_UNREGISTER_TRIGGER = 44, LTTNG_ROTATE_SESSION = 45, + LTTNG_ROTATE_PENDING = 46, }; enum lttcomm_relayd_command { @@ -330,6 +332,9 @@ struct lttcomm_session_msg { struct { uint32_t length; } LTTNG_PACKED trigger; + struct { + uint64_t rotate_id; + } LTTNG_PACKED rotate_pending; } u; } LTTNG_PACKED; @@ -558,6 +563,11 @@ struct lttcomm_consumer_msg { uint32_t uid; uint32_t gid; } LTTNG_PACKED rotate_rename; + struct { + uint64_t relayd_id; + uint64_t session_id; + uint64_t chunk_id; + } LTTNG_PACKED rotate_pending_relay; struct { char path[LTTNG_PATH_MAX]; uint64_t relayd_id; /* Relayd id if apply. */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6873273a3..b26cc19f9 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2010,7 +2010,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_rename.relayd_id); if (ret < 0) { ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + uint32_t pending; + + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + pending = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (pending < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -2020,6 +2044,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } break; } case LTTNG_CONSUMER_MKDIR: @@ -2033,7 +2064,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.mkdir.relayd_id); if (ret < 0) { ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update();