consumer_stream_destroy(stream, metadata_ht);
}
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+ struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
}
rcu_read_lock();
-
+ stream->chan = channel;
stream->key = stream_key;
stream->out_fd = -1;
stream->out_fd_offset = 0;
*/
int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+ struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
health_code_update();
- new_stream = consumer_allocate_stream(channel->key,
+ pthread_mutex_lock(&channel->lock);
+ new_stream = consumer_allocate_stream(
+ channel,
+ channel->key,
fd,
LTTNG_CONSUMER_ACTIVE_STREAM,
channel->name,
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
break;
}
+ pthread_mutex_unlock(&channel->lock);
goto end_nosignal;
}
- new_stream->chan = channel;
new_stream->wait_fd = fd;
switch (channel->output) {
case CONSUMER_CHANNEL_SPLICE:
"relayd id %" PRIu64, new_stream->name,
new_stream->relayd_id);
cds_list_add(&new_stream->send_node, &channel->streams.head);
+ pthread_mutex_unlock(&channel->lock);
break;
}
ret = consumer_send_relayd_stream(new_stream,
new_stream->chan->pathname);
if (ret < 0) {
+ pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
goto end_nosignal;
}
ret = consumer_send_relayd_streams_sent(
new_stream->relayd_id);
if (ret < 0) {
+ pthread_mutex_unlock(&channel->lock);
goto end_nosignal;
}
}
}
+ pthread_mutex_unlock(&channel->lock);
/* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
assert(channel);
assert(ctx);
- stream = consumer_allocate_stream(channel->key,
+ stream = consumer_allocate_stream(
+ channel,
+ channel->key,
key,
LTTNG_CONSUMER_ACTIVE_STREAM,
channel->name,
goto error;
}
- stream->chan = channel;
-
error:
if (_alloc_ret) {
*_alloc_ret = alloc_ret;