X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=28e52696e568760c898cf7c49a8a79068f5a3add;hp=7b385b49f5264e2ace8823ffbc0f8fe4747630a5;hb=36d2e35df61339e4394e84ad9790b984d259e0f0;hpb=a44ca2ca85e4b64729f7b88b1919fd6737dfff8a diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 7b385b49f..28e52696e 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -864,10 +864,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * sessiond/consumerd connection is allocated in @@ -919,6 +916,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1065,12 +1068,12 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; - char session_name[NAME_MAX]; + char session_name[LTTNG_NAME_MAX]; char hostname[HOST_NAME_MAX]; uint32_t live_timer = 0; bool snapshot = false; - memset(session_name, 0, NAME_MAX); + memset(session_name, 0, LTTNG_NAME_MAX); memset(hostname, 0, HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); @@ -1266,9 +1269,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1287,7 +1305,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1388,7 +1405,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1425,9 +1442,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1454,9 +1471,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1468,7 +1485,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -2068,8 +2084,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); - DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, - stream->stream_handle, stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); /* * Lookup for an existing index for that stream id/sequence @@ -2149,7 +2165,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; - bool new_stream = false; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2167,6 +2183,7 @@ static int relay_process_data(struct relay_connection *conn) stream_id = be64toh(data_hdr.stream_id); stream = stream_get_by_id(stream_id); if (!stream) { + ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); ret = -1; goto end; } @@ -2196,6 +2213,8 @@ static int relay_process_data(struct relay_connection *conn) if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); } ret = -1; goto end_stream_put; @@ -2239,6 +2258,8 @@ static int relay_process_data(struct relay_connection *conn) if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } } @@ -2257,6 +2278,8 @@ static int relay_process_data(struct relay_connection *conn) ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { + ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } stream->tracefile_size_current += @@ -2268,7 +2291,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + if (new_stream) { pthread_mutex_lock(&session->lock); uatomic_set(&session->new_streams, 1); @@ -2411,10 +2439,7 @@ restart: /* Inspect the relay conn pipe for new connection */ if (pollfd == relay_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay connection pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); @@ -2425,6 +2450,12 @@ restart: LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay connection pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { struct relay_connection *ctrl_conn; @@ -2433,29 +2464,8 @@ restart: /* If not found, there is a synchronization issue. */ assert(ctrl_conn); - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } - } else if (revents & LPOLLIN) { - if (ctrl_conn->type == RELAY_CONTROL) { - ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr, - sizeof(recv_hdr), 0); - if (ret <= 0) { - /* Connection closed */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } else { - ret = relay_process_control(&recv_hdr, ctrl_conn); - if (ret < 0) { - /* Clear the session on error. */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } - seen_control = 1; - } - } else { + if (ctrl_conn->type == RELAY_DATA) { + if (revents & LPOLLIN) { /* * Flag the last seen data fd not deleted. It will be * used as the last seen fd if any fd gets deleted in @@ -2463,9 +2473,39 @@ restart: */ last_notdel_data_fd = pollfd; } + goto put_ctrl_connection; + } + assert(ctrl_conn->type == RELAY_CONTROL); + + if (revents & LPOLLIN) { + ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, + &recv_hdr, sizeof(recv_hdr), 0); + if (ret <= 0) { + /* Connection closed */ + relay_thread_close_connection(&events, pollfd, + ctrl_conn); + } else { + ret = relay_process_control(&recv_hdr, ctrl_conn); + if (ret < 0) { + /* Clear the session on error. */ + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + } + seen_control = 1; + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + if (last_seen_data_fd == pollfd) { + last_seen_data_fd = last_notdel_data_fd; + } } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for control sock %d", + revents, pollfd); + connection_put(ctrl_conn); + goto error; } + put_ctrl_connection: connection_put(ctrl_conn); } } @@ -2515,17 +2555,17 @@ restart: /* Skip it. Might be removed before. */ continue; } + if (data_conn->type == RELAY_CONTROL) { + goto put_data_connection; + } + assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - if (data_conn->type != RELAY_DATA) { - goto put_connection; - } - ret = relay_process_data(data_conn); /* Connection closed */ if (ret < 0) { relay_thread_close_connection(&events, pollfd, - data_conn); + data_conn); /* * Every goto restart call sets the last seen fd where * here we don't really care since we gracefully @@ -2537,8 +2577,14 @@ restart: connection_put(data_conn); goto restart; } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, pollfd, + data_conn); + } else { + ERR("Unknown poll events %u for data sock %d", + revents, pollfd); } - put_connection: + put_data_connection: connection_put(data_conn); } last_seen_data_fd = -1;