X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Frelayd%2Frelayd.c;h=97a5941f75cb63fbd8c38baafaea73612f7b9970;hb=9c15ec61708bbff73f6e2369c04ae65492990d0b;hp=2adcbe415c63f16a176dba9167c247f34eb0eba3;hpb=e0547b835d76e0a3a2c3082d7c10e6902a1f3e04;p=lttng-tools.git diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 2adcbe415..97a5941f7 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -72,20 +72,67 @@ static int send_command(struct lttcomm_relayd_sock *rsock, memcpy(buf + sizeof(header), data, size); } + DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size); ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags); if (ret < 0) { + PERROR("Failed to send command %d of size %" PRIu64, + (int) cmd, buf_size); ret = -errno; goto error; } - - DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size); - error: free(buf); alloc_error: return ret; } +static int recv_reply_ignore(struct lttcomm_relayd_sock *rsock, size_t size) +{ + int ret; + size_t cutfoff = 128; + + /* + * To prevent ever growing size of recv_reply to ignore, if the number + * of bytes we want to ignore is bigger than `cutoff`, consume half of + * the cutoff. We might block on it but still, most of bytes to ignore + * should already be ready to consume at this point. + * + * This kind of scenario can easily happen on stopped session with a + * live_timer since no actual receive is done on the socket that would + * discard the `ignore` portion. + * + * TCP guarantee in-order transmission both on send and receive so this + * is safe to do. + */ + if (rsock->bytes_to_ignore_on_recv >= cutfoff) { + size_t to_discard = cutfoff / 2; + + ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, to_discard, MSG_TRUNC); + if (ret <= 0 || ret != to_discard) { + if (ret == 0) { + /* Orderly shutdown. */ + DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd); + } else { + DBG("Receiving reply to discard failed on sock %d for size %zu with ret %d", + rsock->sock.fd, to_discard, ret); + } + ret = -1; + goto error; + } + + DBG("Force discard of %zu bytes for socket %d", to_discard, rsock->sock.fd); + rsock->bytes_to_ignore_on_recv -= to_discard; + } + + DBG3("Relayd ignore reply of %zu bytes for socket %d.", size, rsock->sock.fd); + /* Do not wait for the current reply to be ignored */ + rsock->bytes_to_ignore_on_recv += size; + ret = 0; + +error: + return ret; +} + /* * Receive reply data on socket. This MUST be call after send_command or else * could result in unexpected behavior(s). @@ -98,6 +145,26 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size return -ECONNRESET; } + /* + * We have to consume the bytes that are marked to ignore. + */ + if (rsock->bytes_to_ignore_on_recv != 0) { + ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC); + if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) { + if (ret == 0) { + /* Orderly shutdown. */ + DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd); + } else { + DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d", + rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret); + } + ret = -1; + goto error; + } + DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd); + rsock->bytes_to_ignore_on_recv = 0; + } + DBG3("Relayd waiting for reply of size %zu", size); ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0); @@ -320,6 +387,109 @@ error: return ret; } +/* + * Add stream on the relayd. Send part. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name, + const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count) +{ + int ret; + struct lttcomm_relayd_add_stream msg; + struct lttcomm_relayd_add_stream_2_2 msg_2_2; + + /* Code flow error. Safety net. */ + assert(rsock); + assert(channel_name); + assert(pathname); + + DBG("Relayd adding stream for channel name %s. Part send", channel_name); + + /* Compat with relayd 2.1 */ + if (rsock->minor == 1) { + memset(&msg, 0, sizeof(msg)); + if (lttng_strncpy(msg.channel_name, channel_name, + sizeof(msg.channel_name))) { + ret = -1; + goto error; + } + if (lttng_strncpy(msg.pathname, pathname, + sizeof(msg.pathname))) { + ret = -1; + goto error; + } + + /* Send command */ + ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + } else { + memset(&msg_2_2, 0, sizeof(msg_2_2)); + /* Compat with relayd 2.2+ */ + if (lttng_strncpy(msg_2_2.channel_name, channel_name, + sizeof(msg_2_2.channel_name))) { + ret = -1; + goto error; + } + if (lttng_strncpy(msg_2_2.pathname, pathname, + sizeof(msg_2_2.pathname))) { + ret = -1; + goto error; + } + msg_2_2.tracefile_size = htobe64(tracefile_size); + msg_2_2.tracefile_count = htobe64(tracefile_count); + + /* Send command */ + ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0); + if (ret < 0) { + goto error; + } + } + + DBG("Relayd add stream sent for channel name %s.", channel_name); + ret = 0; + +error: + return ret; +} + +int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id) +{ + int ret; + struct lttcomm_relayd_status_stream reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + /* Waiting for reply */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Back to host bytes order. */ + reply.handle = be64toh(reply.handle); + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd add stream replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + *_stream_id = reply.handle; + } + + DBG("Relayd stream added successfully with handle %" PRIu64, + reply.handle); + +error: + return ret; +} + /* * Inform the relay that all the streams for the current channel has been sent. * @@ -378,7 +548,8 @@ end: * If major versions are compatible, we assign minor_to_use to the * minor version of the procotol we are going to use for this session. * - * Return 0 if compatible else negative value. + * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL + * otherwise, or a negative value on network errors. */ int relayd_version_check(struct lttcomm_relayd_sock *rsock) { @@ -420,7 +591,7 @@ int relayd_version_check(struct lttcomm_relayd_sock *rsock) */ if (msg.major != rsock->major) { /* Not compatible */ - ret = -1; + ret = LTTNG_ERR_RELAYD_VERSION_FAIL; DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)", msg.major, rsock->major); goto error; @@ -522,6 +693,27 @@ int relayd_close(struct lttcomm_relayd_sock *rsock) goto end; } + /* + * This ensure that we do not close the socket while the lttng-relayd + * expects to be able to send a response that we skipped. + * While we loose some time to receive everything, this keep the + * protocol intact from the point of view of lttng-relayd. + */ + if (rsock->bytes_to_ignore_on_recv != 0) { + ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC); + if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) { + if (ret == 0) { + /* Orderly shutdown. */ + DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd); + } else { + DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d", + rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret); + } + } + DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd); + rsock->bytes_to_ignore_on_recv = 0; + } + DBG3("Relayd closing socket %d", rsock->sock.fd); if (rsock->sock.ops) { @@ -603,23 +795,16 @@ int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_ goto error; } - /* Receive response */ - ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + /* + * Discard response since we do not really care for it and that TCP + * guarantee in-order delivery. As for error handling, there is not much + * to do at this point (closing). + **/ + ret = recv_reply_ignore(rsock, sizeof(reply)); if (ret < 0) { goto error; } - reply.ret_code = be32toh(reply.ret_code); - - /* Return session id or negative ret code. */ - if (reply.ret_code != LTTNG_OK) { - ret = -1; - ERR("Relayd close stream replied error %d", reply.ret_code); - } else { - /* Success */ - ret = 0; - } - DBG("Relayd close stream id %" PRIu64 " successfully", stream_id); error: @@ -865,23 +1050,18 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, goto error; } - /* Receive response */ - ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + /* + * Ignore the response. TCP guarantee in-order arrival and the overall + * protocol do not rely on hard ordering between the control and data + * socket for index. + * Indexes are sent either at the end of the buffer consumption or + * during the live timer. + */ + ret = recv_reply_ignore(rsock, sizeof(reply)); if (ret < 0) { goto error; } - reply.ret_code = be32toh(reply.ret_code); - - /* Return session id or negative ret code. */ - if (reply.ret_code != LTTNG_OK) { - ret = -1; - ERR("Relayd send index replied error %d", reply.ret_code); - } else { - /* Success */ - ret = 0; - } - error: return ret; }