From: Julien Desfossez Date: Fri, 8 Sep 2017 19:06:55 +0000 (-0400) Subject: send the rotate pending to the relay X-Git-Url: http://git.efficios.com/?a=commitdiff_plain;h=d3dedf27c4a2ce0ea994d90f637eba9ed2f26776;hp=63e5638db6236d902c4ea909fc53e5842e5b71b6;p=deliverable%2Flttng-tools.git send the rotate pending to the relay Signed-off-by: Julien Desfossez --- diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index a69eb531c..88f33c156 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -4205,7 +4205,7 @@ int cmd_rotate_session(struct ltt_session *session, } session->rotate_count++; - session->rotate_pending = 1; + session->rotate_pending = true; session->rotate_status = LTTNG_ROTATE_STARTED; /* @@ -4300,6 +4300,39 @@ int cmd_rotate_pending(struct ltt_session *session, } else if (session->rotate_status == LTTNG_ROTATE_EMPTY) { DBG("Nothing to rotate"); (*pending_return)->status = LTTNG_ROTATE_EMPTY; + /* Rotate with a relay */ + } else if (session->rotate_pending_relay) { + /* The consumer has not finished the rotation. */ + if (session->rotate_pending) { + DBG("Session %s, rotate_id %" PRIu64 " still pending", + session->name, session->rotate_count); + (*pending_return)->status = LTTNG_ROTATE_STARTED; + } else { + /* + * The consumer finished the rotation, but we don't + * know if the relay still has data pending. We need + * to find one consumer_output to talk to the relay + * and ask it. + */ + ret = relay_rotate_pending(session); + if (ret == 0) { + DBG("Rotate completed on the relay for session %s" + ", rotate_id %" PRIu64, + session->name, + session->rotate_count); + (*pending_return)->status = LTTNG_ROTATE_COMPLETED; + snprintf((*pending_return)->output_path, PATH_MAX, "%s", + session->rotation_chunk.current_rotate_path); + } else if (ret == 1) { + DBG("Session %s, rotate_id %" PRIu64 " still pending " + "on the relay", + session->name, session->rotate_count); + (*pending_return)->status = LTTNG_ROTATE_STARTED; + } else { + ERR("Failed to check rotate pending on the relay"); + (*pending_return)->status = LTTNG_ROTATE_ERROR; + } + } } else if (session->rotate_pending) { DBG("Session %s, rotate_id %" PRIu64 " still pending", session->name, session->rotate_count); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 537090b70..f901466f8 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1601,7 +1601,8 @@ end: */ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - char *app_pathname, uint32_t metadata, uint64_t new_chunk_id) + char *app_pathname, uint32_t metadata, uint64_t new_chunk_id, + bool *rotate_pending_relay) { int ret; struct lttcomm_consumer_msg msg; @@ -1625,6 +1626,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, output->dst.net.base_dir, output->chunk_path, app_pathname); fprintf(stderr, "SENDING: %s\n", msg.u.rotate_channel.pathname); + *rotate_pending_relay = true; } else { msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL; snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s%s%s", @@ -1664,9 +1666,6 @@ int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, msg.u.rotate_rename.gid = gid; snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s", current_path); snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s", new_path); - fprintf(stderr, "rotate rename from %s to %s\n", current_path, - new_path); - if (output->type == CONSUMER_DST_NET) { msg.u.rotate_rename.relayd_id = output->net_seq_index; @@ -1675,15 +1674,59 @@ int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, } health_code_update(); - fprintf(stderr, "send %d to the consumer\n", - LTTNG_CONSUMER_ROTATE_RENAME); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error; + } - ret = 0; +error: + health_code_update(); + return ret; +} + +/* + * Ask the relay if a rotation is still pending. Must be called with the socket + * lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_rotate_pending_relay(struct consumer_socket *socket, + struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + int32_t ret_code = 0; + + assert(socket); + + DBG("Consumer rotate pending on relay for session %" PRIu64, session_id); + assert(output->type == CONSUMER_DST_NET); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY; + msg.u.rotate_pending_relay.session_id = session_id; + msg.u.rotate_pending_relay.relayd_id = output->net_seq_index; + msg.u.rotate_pending_relay.chunk_id = chunk_id; + + health_code_update(); ret = consumer_send_msg(socket, &msg); if (ret < 0) { goto error; } + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + if (ret < 0) { + goto error; + } + + ret = ret_code; + error: health_code_update(); return ret; diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index ae303222e..60f84a5fa 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -326,9 +326,13 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - char *tmp, uint32_t metadata, uint64_t new_chunk_id); + char *tmp, uint32_t metadata, uint64_t new_chunk_id, + bool *rotate_pending_relay); int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, struct consumer_output *output, char *current_path, char *new_path, uid_t uid, gid_t gid); +int consumer_rotate_pending_relay(struct consumer_socket *socket, + struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id); #endif /* _CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 43fc93553..10ddabdbc 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -1179,8 +1179,8 @@ int kernel_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, chan->fd, ksess->uid, ksess->gid, ksess->consumer, - "", 0, - session->rotate_count); + "", 0, session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; pthread_mutex_unlock(socket->lock); @@ -1196,7 +1196,8 @@ int kernel_rotate_session(struct ltt_session *session) pthread_mutex_lock(socket->lock); ret = consumer_rotate_channel(socket, ksess->metadata->fd, ksess->uid, ksess->gid, ksess->consumer, "", 1, - session->rotate_count); + session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; pthread_mutex_unlock(socket->lock); diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c index d99e2d9ff..498a3b67e 100644 --- a/src/bin/lttng-sessiond/rotate.c +++ b/src/bin/lttng-sessiond/rotate.c @@ -195,12 +195,6 @@ int rename_complete_chunk(struct ltt_session *session, time_t ts) char *new_path = NULL; int ret; - /* - * TODO 2: on first rotate, the current_rotate_path is the session root - * path, so move the kernel/ and ust/ folders inside the - * "session->last_chunk_start_ts-now()" - */ - timeinfo = localtime(&ts); strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo); @@ -301,3 +295,59 @@ end: return ret; } +int relay_rotate_pending(struct ltt_session *session) +{ + int ret; + struct consumer_socket *socket; + struct consumer_output *output; + struct lttng_ht_iter iter; + + /* + * Either one of the sessions is enough to find the consumer_output + * and uid/gid. + */ + if (session->kernel_session) { + output = session->kernel_session->consumer; + } else if (session->ust_session) { + output = session->ust_session->consumer; + } else { + assert(0); + } + + if (!output || !output->socks) { + ERR("No consumer output found"); + ret = -1; + goto end; + } + + rcu_read_lock(); + /* + * We have to iterate to find a socket, but we only need to send the + * rotate pending command to one consumer, so we break after the first + * one. + */ + cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + /* + * (rotate_count - 1) is the chunk id that we want to make sure + * is completely flushed to disk on the relay. + */ + ret = consumer_rotate_pending_relay(socket, output, session->id, + session->rotate_count - 1); + pthread_mutex_unlock(socket->lock); + if (ret) { + ERR("Consumer rename chunk"); + ret = -1; + rcu_read_unlock(); + goto end; + } + break; + } + rcu_read_unlock(); + + ret = 0; + +end: + return ret; +} + diff --git a/src/bin/lttng-sessiond/rotate.h b/src/bin/lttng-sessiond/rotate.h index 397a161e0..652ff4d3e 100644 --- a/src/bin/lttng-sessiond/rotate.h +++ b/src/bin/lttng-sessiond/rotate.h @@ -32,4 +32,6 @@ int session_rename_chunk(struct ltt_session *session, char *current_path, int rename_complete_chunk(struct ltt_session *session, time_t ts); +int relay_rotate_pending(struct ltt_session *session); + #endif /* ROTATE_H */ diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c index 3269f71a1..24c9715a1 100644 --- a/src/bin/lttng-sessiond/rotation-thread.c +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -323,7 +323,7 @@ int handle_channel_rotation_pipe(int fd, uint32_t revents, ERR("Failed to rename completed rotation chunk"); goto end; } - channel_info->session->rotate_pending = 0; + channel_info->session->rotate_pending = false; } channel_rotation_info_destroy(channel_info); diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index 5a10340a7..f6e8ab071 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -400,6 +400,9 @@ int session_create(char *name, uid_t uid, gid_t gid) goto error; } + new_session->rotate_pending = false; + new_session->rotate_pending_relay = false; + /* Add new session to the session list */ session_lock_list(); new_session->id = add_session_list(new_session); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 41a894079..25f0927f7 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -135,14 +135,13 @@ struct ltt_session { * Number of session rotation for this session. */ uint64_t rotate_count; - unsigned int rotate_pending:1; + bool rotate_pending; + bool rotate_pending_relay; enum lttng_rotate_status rotate_status; /* * Number of channels waiting for a rotate. * When this number reaches 0, we can handle the rename of the chunk * folder and inform the client that the rotate is finished. - * - * TODO: replace rotate_pending checks by that. */ unsigned int nr_chan_rotate_pending; struct ltt_session_chunk rotation_chunk; diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 2283f980a..cc42e250b 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -6360,7 +6360,8 @@ int ust_app_rotate_session(struct ltt_session *session) reg_chan->consumer_key, usess->uid, usess->gid, usess->consumer, pathname, 0, - session->rotate_count); + session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { goto error; } @@ -6372,7 +6373,8 @@ int ust_app_rotate_session(struct ltt_session *session) reg->registry->reg.ust->metadata_key, usess->uid, usess->gid, usess->consumer, pathname, 1, - session->rotate_count); + session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { goto error; } @@ -6444,7 +6446,8 @@ int ust_app_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, ua_chan->key, ua_sess->euid, ua_sess->egid, ua_sess->consumer, pathname, 0, - session->rotate_count); + session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { goto error; } @@ -6455,7 +6458,8 @@ int ust_app_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, registry->metadata_key, ua_sess->euid, ua_sess->egid, ua_sess->consumer, pathname, 1, - session->rotate_count); + session->rotate_count, + &session->rotate_pending_relay); if (ret < 0) { goto error; } @@ -6468,7 +6472,7 @@ int ust_app_rotate_session(struct ltt_session *session) } if (nr_app == 0 && nr_channels == 0) { - session->rotate_pending = 0; + session->rotate_pending = false; session->rotate_status = LTTNG_ROTATE_EMPTY; } diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index 9e5a037a3..53e88c26e 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -593,6 +593,12 @@ int ust_app_rotate_session(struct ltt_session *session) { return 0; } + +static inline +int ust_app_relay_rotate_pending(struct ltt_session *session) +{ + return 0; +} #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTT_UST_APP_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f8b25665f..c6fce9777 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4348,3 +4348,23 @@ int lttng_consumer_rotate_rename(char *current_path, char *new_path, return rotate_rename_local(current_path, new_path, uid, gid); } } + +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + +end: + return ret; + +} diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 63254007a..0ee937c7a 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -65,6 +65,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, LTTNG_CONSUMER_ROTATE_CHANNEL, LTTNG_CONSUMER_ROTATE_RENAME, + LTTNG_CONSUMER_ROTATE_PENDING_RELAY, }; /* State of each fd in consumer */ @@ -834,6 +835,8 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, struct lttng_consumer_local_data *ctx); int lttng_consumer_rotate_rename(char *current_path, char *new_path, uid_t uid, gid_t gid, uint64_t relayd_id); +int lttng_consumer_rotate_pending_relay( uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4fb586c08..91722f3a3 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1193,6 +1193,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + ret = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (ret < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + } default: goto end_nosignal; diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index ed8136193..339ed85ab 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -995,7 +995,7 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, msg.stream_id = htobe64(stream_id); msg.new_chunk_id = htobe64(new_chunk_id); /* - * the seq_num is invalid for metadata streams, but it is ignored on + * The seq_num is invalid for metadata streams, but it is ignored on * the relay. */ msg.rotate_at_seq_num = htobe64(seq_num); @@ -1087,3 +1087,47 @@ error: return ret; } + +int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id) +{ + int ret; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Relayd rotate pending"); + + memset(&msg, 0, sizeof(msg)); + msg.chunk_id = htobe64(chunk_id); + + /* Send command */ + ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Receive response */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd rotate pending replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + } + + DBG("Relayd rotate pending completed successfully"); + +error: + return ret; + +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index f94a8e227..7b2553d57 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -56,5 +56,7 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num); int relayd_rotate_rename(struct lttcomm_relayd_sock *sock, const char *current_path, const char *new_path); +int relayd_rotate_pending(struct lttcomm_relayd_sock *sock, + uint64_t chunk_id); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 921ff182a..f6895ddce 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -219,4 +219,8 @@ struct lttcomm_relayd_rotate_rename { char new_path[LTTNG_PATH_MAX]; } LTTNG_PACKED; +struct lttcomm_relayd_rotate_pending { + uint64_t chunk_id; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 5e045be45..404318bda 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -130,6 +130,8 @@ enum lttcomm_relayd_command { RELAYD_ROTATE_STREAM = 18, /* Rename a chunk after the rotation is completed (2.11+) */ RELAYD_ROTATE_RENAME = 19, + /* Check if a chunk has data pending (2.11+) */ + RELAYD_ROTATE_PENDING = 20, }; /* @@ -556,10 +558,14 @@ struct lttcomm_consumer_msg { char new_path[PATH_MAX]; uint64_t relayd_id; /* Relayd id if apply. */ uint64_t session_id; - uint32_t create; /* Create new_path before move. */ uint32_t uid; uint32_t gid; } LTTNG_PACKED rotate_rename; + struct { + uint64_t relayd_id; + uint64_t session_id; + uint64_t chunk_id; + } LTTNG_PACKED rotate_pending_relay; } u; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 2a02568b0..0c022ade7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1991,6 +1991,28 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + ret = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (ret < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + } default: break;