* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <assert.h>
#include <inttypes.h>
#include <sys/mman.h>
#include <common/kernel-consumer/kernel-consumer.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
+#include <common/utils.h>
#include "consumer-stream.h"
}
stream->wait_fd = -1;
}
+ if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+ utils_close_pipe(stream->splice_pipe);
+ }
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ {
+ /*
+ * Special case for the metadata since the wait fd is an internal pipe
+ * polled in the metadata thread.
+ */
+ if (stream->metadata_flag && stream->chan->monitor) {
+ int rpipe = stream->ust_metadata_poll_pipe[0];
+
+ /*
+ * This will stop the channel timer if one and close the write side
+ * of the metadata poll pipe.
+ */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ if (rpipe >= 0) {
+ ret = close(rpipe);
+ if (ret < 0) {
+ PERROR("closing metadata pipe read side");
+ }
+ stream->ust_metadata_poll_pipe[0] = -1;
+ }
+ }
break;
+ }
default:
ERR("Unknown consumer_data type");
assert(0);
rcu_read_unlock();
- /* Decrement the stream count of the global consumer data. */
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
+ if (!stream->metadata_flag) {
+ /* Decrement the stream count of the global consumer data. */
+ assert(consumer_data.stream_count > 0);
+ consumer_data.stream_count--;
+ }
}
/*
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_index(&relayd->control_sock, index,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
ssize_t size_ret;
}
/*
- * Synchronize the metadata using a given session ID. A successful acquisition
- * of a metadata stream will trigger a request to the session daemon and a
- * snapshot so the metadata thread can consume it.
- *
- * This function call is a rendez-vous point between the metadata thread and
- * the data thread.
+ * Actually do the metadata sync using the given metadata stream.
*
- * Return 0 on success or else a negative value.
+ * Return 0 on success else a negative value. ENODATA can be returned also
+ * indicating that there is no metadata available for that stream.
*/
-int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
- uint64_t session_id)
+static int do_sync_metadata(struct lttng_consumer_stream *metadata,
+ struct lttng_consumer_local_data *ctx)
{
int ret;
- struct lttng_consumer_stream *metadata = NULL, *stream = NULL;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
+ assert(metadata);
+ assert(metadata->metadata_flag);
assert(ctx);
- /* Ease our life a bit. */
- ht = consumer_data.stream_list_ht;
-
- rcu_read_lock();
-
- /* Search the metadata associated with the session id of the given stream. */
-
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
- &session_id, &iter.iter, stream, node_session_id.node) {
- if (stream->metadata_flag) {
- metadata = stream;
- break;
- }
- }
- if (!metadata) {
- ret = 0;
- goto end_unlock_rcu;
- }
-
/*
* In UST, since we have to write the metadata from the cache packet
* by packet, we might need to start this procedure multiple times
pthread_mutex_unlock(&metadata->metadata_rdv_lock);
} while (ret == EAGAIN);
- ret = 0;
- goto end_unlock_rcu;
+ /* Success */
+ return 0;
end_unlock_mutex:
pthread_mutex_unlock(&metadata->lock);
-end_unlock_rcu:
+ return ret;
+}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+ uint64_t session_id)
+{
+ int ret;
+ struct lttng_consumer_stream *stream = NULL;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+
+ assert(ctx);
+
+ /* Ease our life a bit. */
+ ht = consumer_data.stream_list_ht;
+
+ rcu_read_lock();
+
+ /* Search the metadata associated with the session id of the given stream. */
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+ &session_id, &iter.iter, stream, node_session_id.node) {
+ if (!stream->metadata_flag) {
+ continue;
+ }
+
+ ret = do_sync_metadata(stream, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
+ /*
+ * Force return code to 0 (success) since ret might be ENODATA for instance
+ * which is not an error but rather that we should come back.
+ */
+ ret = 0;
+
+end:
rcu_read_unlock();
return ret;
}