X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=70a1948c328cc690e39e4add4d67f4a2a8f2fe44;hp=1bdef652eb64d7ad60ffb3b582ca540f24a62361;hb=cef0f7d51b8025d3ba04e6496242c1cca1641aa6;hpb=87b576ecd158f43c99a16b9c7c2092f7291aee00 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 1bdef652e..70a1948c3 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -293,12 +293,10 @@ void cleanup(void) static int notify_thread_pipe(int wpipe) { - int ret; + ssize_t ret; - do { - ret = write(wpipe, "!", 1); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != 1) { + ret = lttng_write(wpipe, "!", 1); + if (ret < 1) { PERROR("write poll pipe"); } @@ -307,12 +305,10 @@ int notify_thread_pipe(int wpipe) static void notify_health_quit_pipe(int *pipe) { - int ret; + ssize_t ret; - do { - ret = write(pipe[1], "4", 1); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != 1) { + ret = lttng_write(pipe[1], "4", 1); + if (ret < 1) { PERROR("write relay health quit"); } } @@ -707,7 +703,8 @@ error_sock_control: static void *relay_thread_dispatcher(void *data) { - int ret, err = -1; + int err = -1; + ssize_t ret; struct cds_wfq_node *node; struct relay_command *relay_cmd = NULL; @@ -742,12 +739,10 @@ void *relay_thread_dispatcher(void *data) * call is blocking so we can be assured that the data will be read * at some point in time or wait to the end of the world :) */ - do { - ret = write(relay_cmd_pipe[1], relay_cmd, - sizeof(struct relay_command)); - } while (ret < 0 && errno == EINTR); + ret = lttng_write(relay_cmd_pipe[1], relay_cmd, + sizeof(struct relay_command)); free(relay_cmd); - if (ret < 0 || ret != sizeof(struct relay_command)) { + if (ret < sizeof(struct relay_command)) { PERROR("write cmd pipe"); goto error; } @@ -854,7 +849,10 @@ static void destroy_stream(struct relay_stream *stream) * lookup failure on the live thread side of a stream indicates * that the viewer stream index received value should be used. */ + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); vstream->total_index_received = stream->total_index_received; + vstream->close_write_flag = 1; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); } /* Cleanup index of that stream. */ @@ -1248,7 +1246,7 @@ int relay_start(struct lttcomm_relayd_hdr *recv_hdr, */ static int write_padding_to_file(int fd, uint32_t size) { - int ret = 0; + ssize_t ret = 0; char *zeros; if (size == 0) { @@ -1262,10 +1260,8 @@ static int write_padding_to_file(int fd, uint32_t size) goto end; } - do { - ret = write(fd, zeros, size); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != size) { + ret = lttng_write(fd, zeros, size); + if (ret < size) { PERROR("write padding to file"); } @@ -1283,6 +1279,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd) { int ret = htobe32(LTTNG_OK); + ssize_t size_ret; struct relay_session *session = cmd->session; struct lttcomm_relayd_metadata_payload *metadata_struct; struct relay_stream *metadata_stream; @@ -1339,11 +1336,9 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock; } - do { - ret = write(metadata_stream->fd, metadata_struct->payload, - payload_size); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != payload_size) { + size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload, + payload_size); + if (size_ret < payload_size) { ERR("Relay error writing metadata on file"); ret = -1; goto end_unlock; @@ -1984,6 +1979,7 @@ static int relay_process_data(struct relay_command *cmd) { int ret = 0, rotate_index = 0; + ssize_t size_ret; struct relay_stream *stream; struct lttcomm_relayd_data_hdr data_hdr; uint64_t stream_id; @@ -2046,10 +2042,54 @@ int relay_process_data(struct relay_command *cmd) if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { + struct relay_viewer_stream *vstream; + uint64_t new_id; + + new_id = (stream->tracefile_count_current + 1) % + stream->tracefile_count; + /* + * When we wrap-around back to 0, we start overwriting old + * trace data. + */ + if (!stream->tracefile_overwrite && new_id == 0) { + stream->tracefile_overwrite = 1; + } + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); + if (stream->tracefile_overwrite) { + stream->oldest_tracefile_id = + (stream->oldest_tracefile_id + 1) % + stream->tracefile_count; + } + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (vstream) { + /* + * The viewer is reading a file about to be + * overwritten. Close the FDs it is + * currently using and let it handle the fault. + */ + if (vstream->tracefile_count_current == new_id) { + pthread_mutex_lock(&vstream->overwrite_lock); + vstream->abort_flag = 1; + pthread_mutex_unlock(&vstream->overwrite_lock); + DBG("Streaming side setting abort_flag on stream %s_%lu\n", + stream->channel_name, new_id); + } else if (vstream->tracefile_count_current == + stream->tracefile_count_current) { + /* + * The reader and writer were in the + * same trace file, inform the viewer + * that no new index will ever be added + * to this file. + */ + vstream->close_write_flag = 1; + } + } ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); + stream->total_index_received = 0; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); goto end_rcu_unlock; @@ -2071,10 +2111,8 @@ int relay_process_data(struct relay_command *cmd) } /* Write data to stream output fd. */ - do { - ret = write(stream->fd, data_buffer, data_size); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != data_size) { + size_ret = lttng_write(stream->fd, data_buffer, data_size); + if (size_ret < data_size) { ERR("Relay error writing data to file"); ret = -1; goto end_rcu_unlock; @@ -2120,17 +2158,15 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, struct lttng_ht *relay_connections_ht) { struct relay_command *relay_connection; - int ret; + ssize_t ret; relay_connection = zmalloc(sizeof(struct relay_command)); if (relay_connection == NULL) { PERROR("Relay command zmalloc"); goto error; } - do { - ret = read(fd, relay_connection, sizeof(struct relay_command)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret < sizeof(struct relay_command)) { + ret = lttng_read(fd, relay_connection, sizeof(struct relay_command)); + if (ret < sizeof(struct relay_command)) { PERROR("read relay cmd pipe"); goto error_read; }