X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=86428f0663141fa250905e269d7ceea22c5cb43d;hb=2bba9e532ca1910822005ff7f67400a2e871467c;hp=2cf9ac1a894bfe7d35f31b038fa44e3343ce8e4a;hpb=0f907de1f20c91a2bd1a08ee4d50332d1958754b;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 2cf9ac1a8..86428f066 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -82,6 +83,11 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -92,6 +98,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + if (ret > 0) { + ret = -1; + } return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -121,21 +130,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } - DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count); + msg.u.channel.tracefile_count, + msg.u.channel.monitor); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -154,20 +164,31 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel, ctx); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel, ctx); + ret = consumer_add_channel(new_channel, ctx); } + + /* If we received an error in add_channel, we need to report it. */ + if (ret < 0) { + ret = consumer_send_status_msg(sock, ret); + if (ret < 0) { + goto error_fatal; + } + goto end_nosignal; + } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { - int fd, stream_pipe; + int fd; + struct lttng_pipe *stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; @@ -189,10 +210,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { + if (ret < 0) { /* - * Somehow, the session daemon is not responding anymore or the - * channel was not found. + * Somehow, the session daemon is not responding + * anymore. + */ + goto error_fatal; + } + if (ret_code != LTTNG_OK) { + /* + * Channel was not found. */ goto end_nosignal; } @@ -246,6 +273,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->chan = channel; new_stream->wait_fd = fd; + /* + * We've just assigned the channel to the stream so increment the + * refcount right now. + */ + uatomic_inc(&new_stream->chan->refcount); + /* * The buffer flush is done on the session daemon side for the kernel * so no need for the stream "hangup_flush_done" variable to be @@ -285,20 +318,26 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + /* Do not monitor this stream. */ + if (!channel->monitor) { + DBG("Kernel consumer add stream %s in no monitor mode with" + "relayd id %" PRIu64, new_stream->name, + new_stream->relayd_stream_id); + break; + } + /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + stream_pipe = ctx->consumer_metadata_pipe; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + stream_pipe = ctx->consumer_data_pipe; } - do { - ret = write(stream_pipe, &new_stream, sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { - PERROR("Consumer write %s stream to pipe %d", + ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", - stream_pipe); + lttng_pipe_get_writefd(stream_pipe)); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -343,7 +382,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } goto end_nosignal; @@ -361,6 +400,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { PERROR("send data pending ret code"); + goto error_fatal; } /* @@ -369,6 +409,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ break; } + case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: + { + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } default: goto end_nosignal; } @@ -381,6 +430,11 @@ end_nosignal: * shutdown during the recv() or send() call. */ return 1; + +error_fatal: + rcu_read_unlock(); + /* This will issue a consumer stop. */ + return -1; } /* @@ -509,8 +563,11 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); - /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL) { + /* + * Don't create anything if this is set for streaming or should not be + * monitored. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid);