X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=e0a756a5697c40c7cbbd1ff0887a5c01f61464e3;hb=d7b75ec82e5affcfec20e4bc6b208648aa0f58ba;hp=4f83639d5d8ded41abaa03cb0c649f907bd0df1a;hpb=cd2b09ed75c28ef5e82698972582c99e6b423134;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 4f83639d5..e0a756a56 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -243,7 +243,7 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) relayd->sessiond_session_id), &iter); node = lttng_ht_iter_get_node_ulong(&iter); - if (node != NULL) { + if (node == NULL) { /* We assume the relayd is being or is destroyed */ return; } @@ -782,6 +782,13 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); data_hdr.padding_size = htobe32(padding); + /* + * Note that net_seq_num below is assigned with the *current* value of + * next_net_seq_num and only after that the next_net_seq_num will be + * increment. This is why when issuing a command on the relayd using + * this next value, 1 should always be substracted in order to compare + * the last seen sequence number on the relayd side to the last sent. + */ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ @@ -1318,7 +1325,15 @@ static int write_relayd_metadata_id(int fd, ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 && errno == EINTR); if (ret < 0) { - PERROR("write metadata stream id"); + /* + * This error means that the fd's end is closed so ignore the perror + * not to clubber the error output since this can happen in a normal + * code path. + */ + if (errno != EPIPE) { + PERROR("write metadata stream id"); + } + DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno); goto end; } DBG("Metadata stream id %" PRIu64 " with padding %lu written before data", @@ -1435,7 +1450,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } while (ret < 0 && errno == EINTR); DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret < 0) { - PERROR("Error in file write"); + /* + * This is possible if the fd is closed on the other side (outfd) + * or any write problem. It can be verbose a bit for a normal + * execution if for instance the relayd is stopped abruptly. This + * can happen so set this to a DBG statement. + */ + DBG("Error in file write mmap"); if (written == 0) { written = ret; } @@ -2101,17 +2122,13 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { - lttng_poll_reset(&events); - - nb_fd = LTTNG_POLL_GETNB(&events); - /* Only the metadata pipe is set */ - if (nb_fd == 0 && consumer_quit == 1) { + if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { goto end; } restart: - DBG("Metadata poll wait with %d fd(s)", nb_fd); + DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); ret = lttng_poll_wait(&events, -1); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2122,6 +2139,8 @@ restart: goto error; } + nb_fd = ret; + /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { revents = LTTNG_POLL_GETEV(&events, i); @@ -2366,6 +2385,11 @@ void *consumer_thread_data_poll(void *data) pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + if (pipe_readlen < 0) { + PERROR("read consumer data pipe"); + /* Continue so we can at least handle the current stream(s). */ + continue; + } /* * If the stream is NULL, just ignore it. It's also possible that @@ -3008,10 +3032,12 @@ int consumer_data_pending(uint64_t id) if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { - ret = relayd_quiescent_control(&relayd->control_sock); + ret = relayd_quiescent_control(&relayd->control_sock, + stream->relayd_stream_id); } else { ret = relayd_data_pending(&relayd->control_sock, - stream->relayd_stream_id, stream->next_net_seq_num); + stream->relayd_stream_id, + stream->next_net_seq_num - 1); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { @@ -3030,10 +3056,12 @@ int consumer_data_pending(uint64_t id) ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0 || !is_data_inflight) { - /* On error or if NO data inflight, no data is pending. */ + if (ret < 0) { goto data_not_pending; } + if (is_data_inflight) { + goto data_pending; + } } /*