X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=a66f305cd02e433f41bf31d0591c0e545d524a1a;hb=0dd01979d6f26886199ef746377640b57260421c;hp=fe7445b7f133818353fb96b1df10ed0171db88bf;hpb=93ec662e687dc15a3601704a1e0c96c51ad228c9;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index fe7445b7f..a66f305cd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key) health_code_update(); - ustctl_flush_buffer(stream->ustream, 1); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); + } +error: + rcu_read_unlock(); + return ret; +} + +/* + * Clear quiescent state from channel's streams using the given key to + * retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int clear_quiescent_channel(uint64_t chan_key) +{ + int ret = 0; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); + + rcu_read_lock(); + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ht = consumer_data.stream_per_chan_id_ht; + + /* For each stream of the channel id, clear quiescent state. */ + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, + &channel->key, &iter.iter, stream, node_channel_id.node) { + + health_code_update(); + + pthread_mutex_lock(&stream->lock); + stream->quiescent = false; + pthread_mutex_unlock(&stream->lock); } error: rcu_read_unlock(); @@ -1582,6 +1629,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_msg_sessiond; } + case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL: + { + int ret; + + ret = clear_quiescent_channel( + msg.u.clear_quiescent_channel.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } case LTTNG_CONSUMER_PUSH_METADATA: { int ret; @@ -1692,7 +1751,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DISCARDED_EVENTS: { - uint64_t ret; + int ret = 0; + uint64_t discarded_events; struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; @@ -1713,13 +1773,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * found (no events are dropped if the channel is not yet in * use). */ - ret = 0; + discarded_events = 0; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { if (stream->chan->key == key) { - ret = stream->chan->discarded_events; + discarded_events = stream->chan->discarded_events; break; } } @@ -1732,7 +1792,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events)); if (ret < 0) { PERROR("send discarded events"); goto error_fatal; @@ -1944,14 +2004,19 @@ int lttng_ustconsumer_get_sequence_number( } /* - * Called when the stream signal the consumer that it has hang up. + * Called when the stream signals the consumer that it has hung up. */ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { assert(stream); assert(stream->ustream); - ustctl_flush_buffer(stream->ustream, 0); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); stream->hangup_flush_done = 1; } @@ -1985,11 +2050,6 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) } } } - /* Try to rmdir all directories under shm_path root. */ - if (chan->root_shm_path[0]) { - (void) run_as_recursive_rmdir(chan->root_shm_path, - chan->uid, chan->gid); - } } void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) @@ -1999,6 +2059,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); + /* Try to rmdir all directories under shm_path root. */ + if (chan->root_shm_path[0]) { + (void) run_as_recursive_rmdir(chan->root_shm_path, + chan->uid, chan->gid); + } free(chan->stream_fds); }