X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=70bd140672e8ddb7aa5d4c3c39835148aee85ec9;hp=775cb1766f62e4a356cf49ca739351c4cedc7dac;hb=3e25d926e1c03011c68430e3a358d1e9a10ca9ab;hpb=fc6d7a51ee49245f1c24cc33f05a4a8983028f8c diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 775cb1766..70bd14067 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -406,9 +407,7 @@ error_shm_open: return -1; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, - struct ustctl_consumer_channel_attr *attr, - int cpu) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu) { char shm_path[PATH_MAX]; int ret; @@ -463,7 +462,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, attr, i); + stream_fds[i] = open_ust_stream_fd(channel, i); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -539,11 +538,11 @@ error: } /* - * Send channel to sessiond. + * Send channel to sessiond and relayd if applicable. * * Return 0 on success or else a negative value. */ -static int send_sessiond_channel(int sock, +static int send_channel_to_sessiond_and_relayd(int sock, struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx, int *relayd_error) { @@ -563,6 +562,8 @@ static int send_sessiond_channel(int sock, health_code_update(); /* Try to send the stream to the relayd if one is available. */ + DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd", + stream->key, channel->name); ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* @@ -638,7 +639,7 @@ error: * Return 0 on success or else, a negative value is returned and the channel * MUST be destroyed by consumer_del_channel(). */ -static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, +static int ask_channel(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, struct ustctl_consumer_channel_attr *attr) { @@ -1491,7 +1492,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = ask_channel(ctx, sock, channel, &attr); + ret = ask_channel(ctx, channel, &attr); if (ret < 0) { goto end_channel_error; } @@ -1576,8 +1577,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - /* Send everything to sessiond. */ - ret = send_sessiond_channel(sock, channel, ctx, &relayd_err); + /* Send the channel to sessiond (and relayd, if applicable). */ + ret = send_channel_to_sessiond_and_relayd(sock, channel, ctx, + &relayd_err); if (ret < 0) { if (relayd_err) { /* @@ -1918,6 +1920,86 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_msg_sessiond; } + case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE: + { + int channel_rotate_pipe; + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1); + if (ret != sizeof(channel_rotate_pipe)) { + ERR("Failed to receive channel rotate pipe"); + goto error_fatal; + } + + DBG("Received channel rotate pipe (%d)", channel_rotate_pipe); + ctx->channel_rotate_pipe = channel_rotate_pipe; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_rotate_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel rotate pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe"); + goto error_fatal; + } + DBG("Channel rotate pipe set as non-blocking"); + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_CHANNEL: + { + /* + * Sample the rotate position of all the streams in this channel. + */ + ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, + msg.u.rotate_channel.pathname, + msg.u.rotate_channel.relayd_id, + msg.u.rotate_channel.metadata, + msg.u.rotate_channel.new_chunk_id, + ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * Rotate the streams that are ready right now. + * FIXME: this is a second consecutive iteration over the + * streams in a channel, there is probably a better way to + * handle this, but it needs to be after the + * consumer_send_status_msg() call. + */ + ret = lttng_consumer_rotate_ready_streams( + msg.u.rotate_channel.key, ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + break; + } case LTTNG_CONSUMER_ROTATE_RENAME: { DBG("Consumer rename session %" PRIu64 " after rotation", @@ -1929,7 +2011,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_rename.relayd_id); if (ret < 0) { ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + uint32_t pending; + + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + pending = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (pending < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -1939,6 +2045,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } break; } case LTTNG_CONSUMER_MKDIR: @@ -1952,7 +2065,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.mkdir.relayd_id); if (ret < 0) { ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -2553,10 +2666,10 @@ end: * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2588,7 +2701,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, readlen = lttng_read(stream->wait_fd, &dummy, 1); if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; - goto end; + goto error; + } + } + + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; } } @@ -2603,7 +2730,7 @@ retry: if (stream->metadata_flag) { ret = commit_one_metadata_packet(stream); if (ret <= 0) { - goto end; + goto error; } ustctl_flush_buffer(stream->ustream, 1); goto retry; @@ -2618,7 +2745,7 @@ retry: */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency) [ret: %d]", err); - goto end; + goto error; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); @@ -2628,7 +2755,7 @@ retry: if (ret < 0) { err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } /* Update the stream's sequence and discarded events count. */ @@ -2637,7 +2764,7 @@ retry: PERROR("kernctl_get_events_discarded"); err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } } else { write_index = 0; @@ -2655,6 +2782,7 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* @@ -2686,13 +2814,13 @@ retry: if (!stream->metadata_flag) { ret = notify_if_more_data(stream, ctx); if (ret < 0) { - goto end; + goto error; } } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2717,17 +2845,35 @@ retry: } if (err < 0) { - goto end; + goto error; } } assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } +error: return ret; } @@ -2972,7 +3118,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.key = channel->key; DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64, + ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, request.session_id, request.session_id_per_pid, request.uid, request.key);