X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=cbad61144b1d1ec391cf1649a85672751d1311be;hb=194dfca0b76448fb6fa1daa28462031a68c138d8;hp=6030ca182fd19e5c3078473611ffe1e1ab1c6f8d;hpb=02d02e31d47c091a38154c9c188c08387902d97b;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6030ca182..cbad61144 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -540,11 +540,11 @@ error: } /* - * Send channel to sessiond. + * Send channel to sessiond and relayd if applicable. * * Return 0 on success or else a negative value. */ -static int send_sessiond_channel(int sock, +static int send_channel_to_sessiond_and_relayd(int sock, struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx, int *relayd_error) { @@ -564,6 +564,8 @@ static int send_sessiond_channel(int sock, health_code_update(); /* Try to send the stream to the relayd if one is available. */ + DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd", + stream->key, channel->name); ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* @@ -1577,8 +1579,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - /* Send everything to sessiond. */ - ret = send_sessiond_channel(sock, channel, ctx, &relayd_err); + /* Send the channel to sessiond (and relayd, if applicable). */ + ret = send_channel_to_sessiond_and_relayd(sock, channel, ctx, + &relayd_err); if (ret < 0) { if (relayd_err) { /* @@ -1960,6 +1963,45 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_ROTATE_CHANNEL: + { + /* + * Sample the rotate position of all the streams in this channel. + */ + ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key, + msg.u.rotate_channel.pathname, + msg.u.rotate_channel.relayd_id, + msg.u.rotate_channel.metadata, + msg.u.rotate_channel.new_chunk_id, + ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * Rotate the streams that are ready right now. + * FIXME: this is a second consecutive iteration over the + * streams in a channel, there is probably a better way to + * handle this, but it needs to be after the + * consumer_send_status_msg() call. + */ + ret = lttng_consumer_rotate_ready_streams( + msg.u.rotate_channel.key, ctx); + if (ret < 0) { + ERR("Rotate channel failed"); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + break; + } case LTTNG_CONSUMER_ROTATE_RENAME: { DBG("Consumer rename session %" PRIu64 " after rotation", @@ -1971,7 +2013,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_rename.relayd_id); if (ret < 0) { ERR("Rotate rename failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; + } + + health_code_update(); + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: + { + uint32_t pending; + + DBG("Consumer rotate pending on relay for session %" PRIu64, + msg.u.rotate_pending_relay.session_id); + pending = lttng_consumer_rotate_pending_relay( + msg.u.rotate_pending_relay.session_id, + msg.u.rotate_pending_relay.relayd_id, + msg.u.rotate_pending_relay.chunk_id); + if (pending < 0) { + ERR("Rotate pending relay failed"); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update(); @@ -1981,6 +2047,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } break; } case LTTNG_CONSUMER_MKDIR: @@ -1994,7 +2067,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.mkdir.relayd_id); if (ret < 0) { ERR("consumer mkdir failed"); - ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } health_code_update();