X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=28e52696e568760c898cf7c49a8a79068f5a3add;hp=057ac4046c01e67ae05fa938db49e89c9c913718;hb=36d2e35df61339e4394e84ad9790b984d259e0f0;hpb=7591bab11eceedc6a0d1e02fd6f85592267a63b5 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 057ac4046..28e52696e 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -71,6 +71,7 @@ #include "session.h" #include "stream.h" #include "connection.h" +#include "tracefile-array.h" /* command line options */ char *opt_output_path; @@ -863,10 +864,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * sessiond/consumerd connection is allocated in @@ -918,6 +916,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1064,12 +1068,12 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; - char session_name[NAME_MAX]; + char session_name[LTTNG_NAME_MAX]; char hostname[HOST_NAME_MAX]; uint32_t live_timer = 0; bool snapshot = false; - memset(session_name, 0, NAME_MAX); + memset(session_name, 0, LTTNG_NAME_MAX); memset(hostname, 0, HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); @@ -1265,9 +1269,24 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + + /* + * Set last_net_seq_num before the close flag. Required by data + * pending check. + */ pthread_mutex_lock(&stream->lock); - stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); + pthread_mutex_unlock(&stream->lock); + + /* + * This is one of the conditions which may trigger a stream close + * with the others being: + * 1) A close command is received for a stream + * 2) The control connection owning the stream is closed + * 3) We have received all of the stream's data _after_ a close + * request. + */ + try_stream_close(stream); if (stream->is_metadata) { struct relay_viewer_stream *vstream; @@ -1286,7 +1305,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, viewer_stream_put(vstream); } } - pthread_mutex_unlock(&stream->lock); stream_put(stream); end: @@ -1387,7 +1405,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1424,9 +1442,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1453,9 +1471,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1467,7 +1485,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -1890,7 +1907,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * Only flag a stream inactive when it has already * received data and no indexes are in flight. */ - if (stream->total_index_received > 0 + if (stream->index_received_seqcount > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); @@ -1918,7 +1935,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, } ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2066,8 +2084,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); - DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, - stream->stream_handle, stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); /* * Lookup for an existing index for that stream id/sequence @@ -2091,7 +2109,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, fd = index_create_file(stream->path_name, stream->channel_name, -1, -1, stream->tracefile_size, - stream->current_tracefile_id); + tracefile_array_get_file_index_head(stream->tfa)); if (fd < 0) { ret = -1; /* Put self-ref for this index due to error. */ @@ -2120,7 +2138,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, ret = relay_index_try_flush(index); if (ret == 0) { - stream->total_index_received++; + tracefile_array_commit_seq(stream->tfa); + stream->index_received_seqcount++; } else if (ret > 0) { /* No flush. */ ret = 0; @@ -2146,6 +2165,7 @@ static int relay_process_data(struct relay_connection *conn) uint64_t net_seq_num; uint32_t data_size; struct relay_session *session; + bool new_stream = false, close_requested = false; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2163,6 +2183,7 @@ static int relay_process_data(struct relay_connection *conn) stream_id = be64toh(data_hdr.stream_id); stream = stream_get_by_id(stream_id); if (!stream) { + ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); ret = -1; goto end; } @@ -2192,6 +2213,8 @@ static int relay_process_data(struct relay_connection *conn) if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); } ret = -1; goto end_stream_put; @@ -2203,35 +2226,23 @@ static int relay_process_data(struct relay_connection *conn) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - uint64_t new_id; + uint64_t old_id, new_id; + + old_id = tracefile_array_get_file_index_head(stream->tfa); + tracefile_array_file_rotate(stream->tfa); + + /* new_id is updated by utils_rotate_stream_file. */ + new_id = old_id; - new_id = (stream->current_tracefile_id + 1) % - stream->tracefile_count; - /* - * Move viewer oldest available data position forward if - * we are overwriting a tracefile. - */ - if (new_id == stream->oldest_tracefile_id) { - stream->oldest_tracefile_id = - (stream->oldest_tracefile_id + 1) % - stream->tracefile_count; - } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, -1, -1, stream->stream_fd->fd, - &stream->current_tracefile_id, - &stream->stream_fd->fd); + &new_id, &stream->stream_fd->fd); if (ret < 0) { ERR("Rotating stream output file"); goto end_stream_unlock; } - stream->current_tracefile_seq++; - if (stream->current_tracefile_seq - - stream->oldest_tracefile_seq >= - stream->tracefile_count) { - stream->oldest_tracefile_seq++; - } /* * Reset current size because we just performed a stream * rotation. @@ -2247,6 +2258,8 @@ static int relay_process_data(struct relay_connection *conn) if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } } @@ -2265,14 +2278,30 @@ static int relay_process_data(struct relay_connection *conn) ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { + ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size); + if (stream->prev_seq == -1ULL) { + new_stream = true; + } + stream->prev_seq = net_seq_num; end_stream_unlock: + close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); + if (close_requested) { + try_stream_close(stream); + } + + if (new_stream) { + pthread_mutex_lock(&session->lock); + uatomic_set(&session->new_streams, 1); + pthread_mutex_unlock(&session->lock); + } end_stream_put: stream_put(stream); end: @@ -2410,10 +2439,7 @@ restart: /* Inspect the relay conn pipe for new connection */ if (pollfd == relay_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay connection pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); @@ -2424,6 +2450,12 @@ restart: LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay connection pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { struct relay_connection *ctrl_conn; @@ -2432,29 +2464,8 @@ restart: /* If not found, there is a synchronization issue. */ assert(ctrl_conn); - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } - } else if (revents & LPOLLIN) { - if (ctrl_conn->type == RELAY_CONTROL) { - ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr, - sizeof(recv_hdr), 0); - if (ret <= 0) { - /* Connection closed */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } else { - ret = relay_process_control(&recv_hdr, ctrl_conn); - if (ret < 0) { - /* Clear the session on error. */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } - seen_control = 1; - } - } else { + if (ctrl_conn->type == RELAY_DATA) { + if (revents & LPOLLIN) { /* * Flag the last seen data fd not deleted. It will be * used as the last seen fd if any fd gets deleted in @@ -2462,9 +2473,39 @@ restart: */ last_notdel_data_fd = pollfd; } + goto put_ctrl_connection; + } + assert(ctrl_conn->type == RELAY_CONTROL); + + if (revents & LPOLLIN) { + ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, + &recv_hdr, sizeof(recv_hdr), 0); + if (ret <= 0) { + /* Connection closed */ + relay_thread_close_connection(&events, pollfd, + ctrl_conn); + } else { + ret = relay_process_control(&recv_hdr, ctrl_conn); + if (ret < 0) { + /* Clear the session on error. */ + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + } + seen_control = 1; + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + if (last_seen_data_fd == pollfd) { + last_seen_data_fd = last_notdel_data_fd; + } } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for control sock %d", + revents, pollfd); + connection_put(ctrl_conn); + goto error; } + put_ctrl_connection: connection_put(ctrl_conn); } } @@ -2514,17 +2555,17 @@ restart: /* Skip it. Might be removed before. */ continue; } + if (data_conn->type == RELAY_CONTROL) { + goto put_data_connection; + } + assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - if (data_conn->type != RELAY_DATA) { - goto put_connection; - } - ret = relay_process_data(data_conn); /* Connection closed */ if (ret < 0) { relay_thread_close_connection(&events, pollfd, - data_conn); + data_conn); /* * Every goto restart call sets the last seen fd where * here we don't really care since we gracefully @@ -2536,8 +2577,14 @@ restart: connection_put(data_conn); goto restart; } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, pollfd, + data_conn); + } else { + ERR("Unknown poll events %u for data sock %d", + revents, pollfd); } - put_connection: + put_data_connection: connection_put(data_conn); } last_seen_data_fd = -1;