static int send_stream_to_thread(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
- int ret, stream_pipe;
+ int ret;
+ struct lttng_pipe *stream_pipe;
/* Get the right pipe where the stream will be sent. */
if (stream->metadata_flag) {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
+ stream_pipe = ctx->consumer_metadata_pipe;
} else {
- stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
+ stream_pipe = ctx->consumer_data_pipe;
}
- do {
- ret = write(stream_pipe, &stream, sizeof(stream));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
- PERROR("Consumer write %s stream to pipe %d",
- stream->metadata_flag ? "metadata" : "data", stream_pipe);
+ ERR("Consumer write %s stream to pipe %d",
+ stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
}
return ret;
DBG("UST consumer writing metadata to channel %s", metadata->name);
+ if (!metadata->metadata_stream) {
+ ret = 0;
+ goto error;
+ }
+
assert(target_offset <= metadata->metadata_cache->max_offset);
ret = ustctl_write_metadata_to_channel(metadata->uchan,
metadata_str + target_offset, len);
}
pthread_mutex_lock(&consumer_data.lock);
- if (!cds_lfht_is_node_deleted(&channel->node.node)) {
- if (channel->switch_timer_enabled == 1) {
- DBG("Deleting timer on metadata channel");
- consumer_timer_switch_stop(channel);
- }
+
+ if (cds_lfht_is_node_deleted(&channel->node.node)) {
+ goto error_unlock;
+ }
+
+ if (channel->switch_timer_enabled == 1) {
+ DBG("Deleting timer on metadata channel");
+ consumer_timer_switch_stop(channel);
+ }
+
+ if (channel->metadata_stream) {
ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
if (ret < 0) {
ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
goto end_free;
}
+ /*
+ * XXX: The consumer data lock is acquired before calling metadata cache
+ * write which calls push metadata that MUST be protected by the consumer
+ * lock in order to be able to check the validity of the metadata stream of
+ * the channel.
+ *
+ * Note that this will be subject to change to better fine grained locking
+ * and ultimately try to get rid of this global consumer data lock.
+ */
+ pthread_mutex_lock(&consumer_data.lock);
+
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {
DBG("Waiting for metadata to be flushed");