X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=e4ded2b8655d17c5f270bd67cfffca65697766f8;hb=234cd6367843a2106a4cb10f8fb99443208516df;hp=e769daf88fe99b6c0e01d2006861ea8c6a804957;hpb=bda7c7b9b4c633de16f3d8bf109f9d21fdd9a5fb;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e769daf88..e4ded2b86 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -18,7 +18,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -40,7 +39,6 @@ #include #include #include -#include #include #include @@ -56,7 +54,7 @@ #include #include #include -#include +#include #include #include "cmd.h" @@ -307,7 +305,7 @@ end: /* * config_entry_handler_cb used to handle options read from a config file. - * See config_entry_handler_cb comment in common/config/config.h for the + * See config_entry_handler_cb comment in common/config/session-config.h for the * return value conventions. */ static int config_entry_handler(const struct config_entry *entry, void *unused) @@ -864,10 +862,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * sessiond/consumerd connection is allocated in @@ -919,6 +914,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1035,7 +1036,8 @@ error_testpoint: * Set index data from the control port to a given index object. */ static int set_index_control_data(struct relay_index *index, - struct lttcomm_relayd_index *data) + struct lttcomm_relayd_index *data, + struct relay_connection *conn) { struct ctf_packet_index index_data; @@ -1051,6 +1053,12 @@ static int set_index_control_data(struct relay_index *index, index_data.timestamp_end = data->timestamp_end; index_data.events_discarded = data->events_discarded; index_data.stream_id = data->stream_id; + + if (conn->minor >= 8) { + index->index_data.stream_instance_id = data->stream_instance_id; + index->index_data.packet_seq_num = data->packet_seq_num; + } + return relay_index_set_data(index, &index_data); } @@ -1065,13 +1073,13 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; - char session_name[NAME_MAX]; - char hostname[HOST_NAME_MAX]; + char session_name[LTTNG_NAME_MAX]; + char hostname[LTTNG_HOST_NAME_MAX]; uint32_t live_timer = 0; bool snapshot = false; - memset(session_name, 0, NAME_MAX); - memset(hostname, 0, HOST_NAME_MAX); + memset(session_name, 0, LTTNG_NAME_MAX); + memset(hostname, 0, LTTNG_HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); @@ -1402,7 +1410,7 @@ end: static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret = htobe32(LTTNG_OK); + int ret = 0; ssize_t size_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_metadata_payload *metadata_struct; @@ -1439,9 +1447,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } memset(data_buffer, 0, data_size); DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret < 0 || ret != data_size) { - if (ret == 0) { + size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); + if (size_ret < 0 || size_ret != data_size) { + if (size_ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); } else { @@ -1468,9 +1476,9 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_put; } - ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); - if (ret < 0) { + if (size_ret < 0) { goto end_put; } @@ -1482,7 +1490,6 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); - end: return ret; } @@ -1925,7 +1932,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, ERR("relay_index_get_by_id_or_create index NULL"); goto end_stream_put; } - if (set_index_control_data(index, &index_info)) { + if (set_index_control_data(index, &index_info, conn)) { ERR("set_index_control_data error"); relay_index_put(index); ret = -1; @@ -2082,8 +2089,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); - DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, - stream->stream_handle, stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); /* * Lookup for an existing index for that stream id/sequence @@ -2181,6 +2188,7 @@ static int relay_process_data(struct relay_connection *conn) stream_id = be64toh(data_hdr.stream_id); stream = stream_get_by_id(stream_id); if (!stream) { + ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); ret = -1; goto end; } @@ -2210,6 +2218,8 @@ static int relay_process_data(struct relay_connection *conn) if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); } ret = -1; goto end_stream_put; @@ -2253,6 +2263,8 @@ static int relay_process_data(struct relay_connection *conn) if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } } @@ -2271,6 +2283,8 @@ static int relay_process_data(struct relay_connection *conn) ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { + ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } stream->tracefile_size_current += @@ -2430,10 +2444,7 @@ restart: /* Inspect the relay conn pipe for new connection */ if (pollfd == relay_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay connection pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); @@ -2444,6 +2455,12 @@ restart: LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay connection pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { struct relay_connection *ctrl_conn; @@ -2452,29 +2469,8 @@ restart: /* If not found, there is a synchronization issue. */ assert(ctrl_conn); - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } - } else if (revents & LPOLLIN) { - if (ctrl_conn->type == RELAY_CONTROL) { - ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr, - sizeof(recv_hdr), 0); - if (ret <= 0) { - /* Connection closed */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } else { - ret = relay_process_control(&recv_hdr, ctrl_conn); - if (ret < 0) { - /* Clear the session on error. */ - relay_thread_close_connection(&events, pollfd, - ctrl_conn); - } - seen_control = 1; - } - } else { + if (ctrl_conn->type == RELAY_DATA) { + if (revents & LPOLLIN) { /* * Flag the last seen data fd not deleted. It will be * used as the last seen fd if any fd gets deleted in @@ -2482,9 +2478,39 @@ restart: */ last_notdel_data_fd = pollfd; } + goto put_ctrl_connection; + } + assert(ctrl_conn->type == RELAY_CONTROL); + + if (revents & LPOLLIN) { + ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, + &recv_hdr, sizeof(recv_hdr), 0); + if (ret <= 0) { + /* Connection closed */ + relay_thread_close_connection(&events, pollfd, + ctrl_conn); + } else { + ret = relay_process_control(&recv_hdr, ctrl_conn); + if (ret < 0) { + /* Clear the session on error. */ + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + } + seen_control = 1; + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, + pollfd, ctrl_conn); + if (last_seen_data_fd == pollfd) { + last_seen_data_fd = last_notdel_data_fd; + } } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for control sock %d", + revents, pollfd); + connection_put(ctrl_conn); + goto error; } + put_ctrl_connection: connection_put(ctrl_conn); } } @@ -2534,17 +2560,17 @@ restart: /* Skip it. Might be removed before. */ continue; } + if (data_conn->type == RELAY_CONTROL) { + goto put_data_connection; + } + assert(data_conn->type == RELAY_DATA); if (revents & LPOLLIN) { - if (data_conn->type != RELAY_DATA) { - goto put_connection; - } - ret = relay_process_data(data_conn); /* Connection closed */ if (ret < 0) { relay_thread_close_connection(&events, pollfd, - data_conn); + data_conn); /* * Every goto restart call sets the last seen fd where * here we don't really care since we gracefully @@ -2556,8 +2582,14 @@ restart: connection_put(data_conn); goto restart; } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + relay_thread_close_connection(&events, pollfd, + data_conn); + } else { + ERR("Unknown poll events %u for data sock %d", + revents, pollfd); } - put_connection: + put_data_connection: connection_put(data_conn); } last_seen_data_fd = -1;