summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
fc64324)
In order to handle teardown caused by relayd being unresponsive, we need
to ensure that we correctly handle that the streams can be cleaned up
before the channel wakeup pipe gets closed. We achieve this by making
the channel management thread hold a refcount on the channel object, so
it only gets destroyed when all streams, _and_ the channel management
thread, have released their reference.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
- uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)
+ if (!uatomic_sub_return(&stream->chan->refcount, 1)
&& !uatomic_read(&stream->chan->nb_init_stream_left)) {
free_chan = stream->chan;
}
&& !uatomic_read(&stream->chan->nb_init_stream_left)) {
free_chan = stream->chan;
}
* stream.
*/
if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
* stream.
*/
if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ /* Increment refcount before decrementing nb_init_stream_left */
+ cmm_smp_wmb();
uatomic_dec(&stream->chan->nb_init_stream_left);
}
uatomic_dec(&stream->chan->nb_init_stream_left);
}
rcu_read_unlock();
/* Atomically decrement channel refcount since other threads can use it. */
rcu_read_unlock();
/* Atomically decrement channel refcount since other threads can use it. */
- uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)
+ if (!uatomic_sub_return(&stream->chan->refcount, 1)
&& !uatomic_read(&stream->chan->nb_init_stream_left)) {
/* Go for channel deletion! */
free_chan = stream->chan;
&& !uatomic_read(&stream->chan->nb_init_stream_left)) {
/* Go for channel deletion! */
free_chan = stream->chan;
* stream.
*/
if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
* stream.
*/
if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ /* Increment refcount before decrementing nb_init_stream_left */
+ cmm_smp_wmb();
uatomic_dec(&stream->chan->nb_init_stream_left);
}
uatomic_dec(&stream->chan->nb_init_stream_left);
}
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key,
&iter.iter, stream, node_channel_id.node) {
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key,
&iter.iter, stream, node_channel_id.node) {
+ /*
+ * Protect against teardown with mutex.
+ */
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
ERR("Unknown consumer_data type");
assert(0);
}
ERR("Unknown consumer_data type");
assert(0);
}
+ next:
+ pthread_mutex_unlock(&stream->lock);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
consumer_close_channel_streams(chan);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
consumer_close_channel_streams(chan);
+
+ /* Release our own refcount */
+ if (!uatomic_sub_return(&chan->refcount, 1)
+ && !uatomic_read(&chan->nb_init_stream_left)) {
+ consumer_del_channel(chan);
+ }
}
/* Release RCU lock for the channel looked up */
}
/* Release RCU lock for the channel looked up */
goto end_channel_error;
}
goto end_channel_error;
}
+ /*
+ * Set refcount to 1 for owner. Below, we will pass
+ * ownership to the consumer_thread_channel_poll()
+ * thread.
+ */
+ channel->refcount = 1;
+
/* Build channel attributes from received message. */
attr.subbuf_size = msg.u.ask_channel.subbuf_size;
attr.num_subbuf = msg.u.ask_channel.num_subbuf;
/* Build channel attributes from received message. */
attr.subbuf_size = msg.u.ask_channel.subbuf_size;
attr.num_subbuf = msg.u.ask_channel.num_subbuf;