X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Frelayd%2Frelayd.c;h=b4fd35c738af0e24b1a056e2b1545a5653dae239;hp=458553bfe997069706aa9b43987a99d1cfcf0ece;hb=42e9a27b736e48114a7bbcac8e5d06855f2f2ace;hpb=00fb02ace5151a6546f4e97e5439512913a50e68 diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 458553bfe..b4fd35c73 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -123,8 +123,8 @@ error: * support the live reading capability. */ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, - uint64_t *session_id, char *session_name, char *hostname, - int session_live_timer, unsigned int snapshot) + char *session_name, char *hostname, int session_live_timer, + unsigned int snapshot) { int ret; struct lttcomm_relayd_create_session_2_4 msg; @@ -154,8 +154,7 @@ error: /* * RELAYD_CREATE_SESSION from 2.1 to 2.3. */ -static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock, - uint64_t *session_id) +static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock) { int ret; @@ -192,11 +191,11 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_i case 1: case 2: case 3: - ret = relayd_create_session_2_1(rsock, session_id); + ret = relayd_create_session_2_1(rsock); break; case 4: default: - ret = relayd_create_session_2_4(rsock, session_id, session_name, + ret = relayd_create_session_2_4(rsock, session_name, hostname, session_live_timer, snapshot); break; } @@ -942,6 +941,84 @@ error: return ret; } +int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, + const char *new_pathname, uint64_t new_chunk_id, + uint64_t seq_num) +{ + int ret; + struct lttcomm_relayd_rotate_stream *msg = NULL; + struct lttcomm_relayd_generic_reply reply; + size_t len; + int msg_len; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id); + + /* Account for the trailing NULL. */ + len = strnlen(new_pathname, LTTNG_PATH_MAX) + 1; + if (len > LTTNG_PATH_MAX) { + ERR("Path used in relayd rotate stream command exceeds the maximal allowed length"); + ret = -1; + goto error; + } + + msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len; + msg = zmalloc(msg_len); + if (!msg) { + PERROR("Failed to allocate relayd rotate stream command of %d bytes", + msg_len); + ret = -1; + goto error; + } + + if (lttng_strncpy(msg->new_pathname, new_pathname, len)) { + ret = -1; + ERR("Failed to copy relayd rotate stream command's new path name"); + goto error; + } + + msg->pathname_length = htobe32(len); + msg->stream_id = htobe64(stream_id); + msg->new_chunk_id = htobe64(new_chunk_id); + /* + * The seq_num is invalid for metadata streams, but it is ignored on + * the relay. + */ + msg->rotate_at_seq_num = htobe64(seq_num); + + /* Send command. */ + ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0); + if (ret < 0) { + ERR("Send rotate command"); + goto error; + } + + /* Receive response. */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Receive rotate reply"); + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd rotate stream replied error %d", reply.ret_code); + } else { + /* Success. */ + ret = 0; + DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id); + } + +error: + free(msg); + return ret; +} + int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock, const char *old_path, const char *new_path) { @@ -1008,6 +1085,56 @@ error: return ret; } +int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id) +{ + int ret; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_rotate_pending_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Querying relayd for rotate pending with chunk_id %" PRIu64, + chunk_id); + + memset(&msg, 0, sizeof(msg)); + msg.chunk_id = htobe64(chunk_id); + + /* Send command */ + ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg, + sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Receive response */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.generic.ret_code = be32toh(reply.generic.ret_code); + + /* Return session id or negative ret code. */ + if (reply.generic.ret_code != LTTNG_OK) { + ret = -reply.generic.ret_code; + ERR("Relayd rotate pending replied with error %d", ret); + goto error; + } else { + /* No error, just rotate pending state */ + if (reply.is_pending == 0 || reply.is_pending == 1) { + ret = reply.is_pending; + DBG("Relayd rotate pending command completed successfully with result \"%s\"", + ret ? "rotation pending" : "rotation NOT pending"); + } else { + ret = -LTTNG_ERR_UNK; + } + } + +error: + return ret; +} + int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path) { int ret;