X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=a66f305cd02e433f41bf31d0591c0e545d524a1a;hp=4d1e7f155abd0af1474b32eeb9fdc49905127f4c;hb=0dd01979d6f26886199ef746377640b57260421c;hpb=0f6fd3631d4ef411b09f3136ca6929b31014d27e diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4d1e7f155..a66f305cd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key) health_code_update(); - ustctl_flush_buffer(stream->ustream, 1); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); + } +error: + rcu_read_unlock(); + return ret; +} + +/* + * Clear quiescent state from channel's streams using the given key to + * retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int clear_quiescent_channel(uint64_t chan_key) +{ + int ret = 0; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); + + rcu_read_lock(); + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ht = consumer_data.stream_per_chan_id_ht; + + /* For each stream of the channel id, clear quiescent state. */ + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, + &channel->key, &iter.iter, stream, node_channel_id.node) { + + health_code_update(); + + pthread_mutex_lock(&stream->lock); + stream->quiescent = false; + pthread_mutex_unlock(&stream->lock); } error: rcu_read_unlock(); @@ -1582,6 +1629,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_msg_sessiond; } + case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL: + { + int ret; + + ret = clear_quiescent_channel( + msg.u.clear_quiescent_channel.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } case LTTNG_CONSUMER_PUSH_METADATA: { int ret; @@ -1945,14 +2004,19 @@ int lttng_ustconsumer_get_sequence_number( } /* - * Called when the stream signal the consumer that it has hang up. + * Called when the stream signals the consumer that it has hung up. */ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { assert(stream); assert(stream->ustream); - ustctl_flush_buffer(stream->ustream, 0); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); stream->hangup_flush_done = 1; }