#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/utils.h>
+#include <common/time.h>
#include <common/compat/poll.h>
#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/align.h>
#include <common/consumer/consumer-metadata-cache.h>
+#include <common/trace-chunk.h>
+#include <common/trace-chunk-registry.h>
+#include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
*/
void consumer_del_channel(struct lttng_consumer_channel *channel)
{
- int ret;
struct lttng_ht_iter iter;
DBG("Consumer delete channel key %" PRIu64, channel->key);
goto end;
}
- rcu_read_lock();
- iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = NULL;
+
+ if (channel->is_published) {
+ int ret;
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+
+ iter.iter.node = &channel->channels_by_session_id_ht_node.node;
+ ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
+ &iter);
+ assert(!ret);
+ rcu_read_unlock();
+ }
+
+ channel->is_deleted = true;
call_rcu(&channel->node.head, free_channel_rcu);
end:
pthread_mutex_unlock(&channel->lock);
{
stream->channel_read_only_attributes.tracefile_size =
channel->tracefile_size;
- memcpy(stream->channel_read_only_attributes.path, channel->pathname,
- sizeof(stream->channel_read_only_attributes.path));
}
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
- enum lttng_consumer_stream_state state,
const char *channel_name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
- unsigned int monitor,
- uint64_t trace_archive_id)
+ unsigned int monitor)
{
int ret;
struct lttng_consumer_stream *stream;
goto end;
}
- rcu_read_lock();
+ if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
+ ERR("Failed to acquire trace chunk reference during the creation of a stream");
+ ret = -1;
+ goto error;
+ }
+ rcu_read_lock();
stream->key = stream_key;
+ stream->trace_chunk = trace_chunk;
stream->out_fd = -1;
stream->out_fd_offset = 0;
stream->output_written = 0;
- stream->state = state;
- stream->uid = uid;
- stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
- stream->trace_archive_id = trace_archive_id;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
error:
rcu_read_unlock();
+ lttng_trace_chunk_put(stream->trace_chunk);
free(stream);
end:
if (alloc_ret) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count,
- stream->trace_archive_id);
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count,
+ stream->trace_chunk);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
+
/*
* Note that net_seq_num below is assigned with the *current* value of
* next_net_seq_num and only after that the next_net_seq_num will be
return outfd;
}
+/*
+ * Trigger a dump of the metadata content. Following/during the succesful
+ * completion of this call, the metadata poll thread will start receiving
+ * metadata packets to consume.
+ *
+ * The caller must hold the channel and stream locks.
+ */
+static
+int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->chan->lock);
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->metadata_flag);
+ assert(stream->chan->trace_chunk);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ /*
+ * Reset the position of what has been read from the
+ * metadata cache to 0 so we can dump it again.
+ */
+ ret = kernctl_metadata_cache_dump(stream->wait_fd);
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Reset the position pushed from the metadata cache so it
+ * will write from the beginning on the next push.
+ */
+ stream->ust_metadata_pushed = 0;
+ ret = consumer_metadata_wakeup_pipe(stream->chan);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+ if (ret < 0) {
+ ERR("Failed to dump the metadata cache");
+ }
+ return ret;
+}
+
+static
+int lttng_consumer_channel_set_trace_chunk(
+ struct lttng_consumer_channel *channel,
+ struct lttng_trace_chunk *new_trace_chunk)
+{
+ int ret = 0;
+ const bool is_local_trace = channel->relayd_id == -1ULL;
+ bool update_stream_trace_chunk;
+ struct cds_lfht_iter iter;
+ struct lttng_consumer_stream *stream;
+ unsigned long channel_hash;
+
+ pthread_mutex_lock(&channel->lock);
+ if (channel->is_deleted) {
+ /*
+ * The channel has been logically deleted and should no longer
+ * be used. It has released its reference to its current trace
+ * chunk and should not acquire a new one.
+ *
+ * Return success as there is nothing for the caller to do.
+ */
+ goto end;
+ }
+ /*
+ * A stream can transition to a state where it and its channel
+ * no longer belong to a trace chunk. For instance, this happens when
+ * a session is rotated while it is inactive. After the rotation
+ * of an inactive session completes, the channel and its streams no
+ * longer belong to a trace chunk.
+ *
+ * However, if a session is stopped, rotated, and started again,
+ * the session daemon will create a new chunk and send it to its peers.
+ * In that case, the streams' transition to a new chunk can be performed
+ * immediately.
+ *
+ * This trace chunk transition could also be performed lazily when
+ * a buffer is consumed. However, creating the files here allows the
+ * consumer daemon to report any creation error to the session daemon
+ * and cause the start of the tracing session to fail.
+ */
+ update_stream_trace_chunk = !channel->trace_chunk && new_trace_chunk;
+
+ /*
+ * The acquisition of the reference cannot fail (barring
+ * a severe internal error) since a reference to the published
+ * chunk is already held by the caller.
+ */
+ if (new_trace_chunk) {
+ const bool acquired_reference = lttng_trace_chunk_get(
+ new_trace_chunk);
+
+ assert(acquired_reference);
+ }
+
+ lttng_trace_chunk_put(channel->trace_chunk);
+ channel->trace_chunk = new_trace_chunk;
+ if (!is_local_trace || !new_trace_chunk) {
+ /* Not an error. */
+ goto end;
+ }
+
+ if (!update_stream_trace_chunk) {
+ goto end;
+ }
+
+ channel_hash = consumer_data.stream_per_chan_id_ht->hash_fct(
+ &channel->key, lttng_ht_seed);
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.stream_per_chan_id_ht->ht,
+ channel_hash,
+ consumer_data.stream_per_chan_id_ht->match_fct,
+ &channel->key, &iter, stream, node_channel_id.node) {
+ bool acquired_reference, should_regenerate_metadata = false;
+
+ acquired_reference = lttng_trace_chunk_get(channel->trace_chunk);
+ assert(acquired_reference);
+
+ pthread_mutex_lock(&stream->lock);
+
+ /*
+ * On a transition from "no-chunk" to a new chunk, a metadata
+ * stream's content must be entirely dumped. This must occcur
+ * _after_ the creation of the metadata stream's output files
+ * as the consumption thread (not necessarily the one executing
+ * this) may start to consume during the call to
+ * consumer_metadata_stream_dump().
+ */
+ should_regenerate_metadata =
+ stream->metadata_flag &&
+ !stream->trace_chunk && channel->trace_chunk;
+ stream->trace_chunk = channel->trace_chunk;
+ ret = consumer_stream_create_output_files(stream, true);
+ if (ret) {
+ pthread_mutex_unlock(&stream->lock);
+ goto end_rcu_unlock;
+ }
+ if (should_regenerate_metadata) {
+ ret = consumer_metadata_stream_dump(stream);
+ }
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ goto end_rcu_unlock;
+ }
+ }
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ pthread_mutex_unlock(&channel->lock);
+ return ret;
+}
+
/*
* Allocate and return a new lttng_consumer_channel object using the given key
* to initialize the hash table node.
*/
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
+ const uint64_t *chunk_id,
const char *pathname,
const char *name,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
const char *root_shm_path,
const char *shm_path)
{
- struct lttng_consumer_channel *channel;
+ struct lttng_consumer_channel *channel = NULL;
+ struct lttng_trace_chunk *trace_chunk = NULL;
+
+ if (chunk_id) {
+ trace_chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id,
+ *chunk_id);
+ if (!trace_chunk) {
+ ERR("Failed to find trace chunk reference during creation of channel");
+ goto end;
+ }
+ }
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
channel->refcount = 0;
channel->session_id = session_id;
channel->session_id_per_pid = session_id_per_pid;
- channel->uid = uid;
- channel->gid = gid;
channel->relayd_id = relayd_id;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
}
lttng_ht_node_init_u64(&channel->node, channel->key);
+ lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
+ channel->session_id);
channel->wait_fd = -1;
-
CDS_INIT_LIST_HEAD(&channel->streams.head);
+ if (trace_chunk) {
+ int ret = lttng_consumer_channel_set_trace_chunk(channel,
+ trace_chunk);
+ if (ret) {
+ goto error;
+ }
+ }
+
DBG("Allocated channel (key %" PRIu64 ")", channel->key);
end:
+ lttng_trace_chunk_put(trace_chunk);
return channel;
+error:
+ consumer_del_channel(channel);
+ channel = NULL;
+ goto end;
}
/*
rcu_read_lock();
lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+ lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
+ &channel->channels_by_session_id_ht_node);
rcu_read_unlock();
+ channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
* closed by the polling thread after a wakeup on the data_pipe or
* metadata_pipe.
*/
- if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
(*nb_inactive_fd)++;
continue;
}
/*
* This clobbers way too much the debug output. Uncomment that if you
* need it for debugging purposes.
- *
- * DBG("Active FD %d", stream->wait_fd);
*/
(*pollfd)[i].fd = stream->wait_fd;
(*pollfd)[i].events = POLLIN | POLLPRI;
{
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
+ unsigned int trace_chunks_left;
rcu_read_lock();
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+ lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
cleanup_relayd_ht();
* it.
*/
lttng_ht_destroy(consumer_data.stream_list_ht);
+
+ /*
+ * Trace chunks in the registry may still exist if the session
+ * daemon has encountered an internal error and could not
+ * tear down its sessions and/or trace chunks properly.
+ *
+ * Release the session daemon's implicit reference to any remaining
+ * trace chunk and print an error if any trace chunk was found. Note
+ * that there are _no_ legitimate cases for trace chunks to be left,
+ * it is a leak. However, it can happen following a crash of the
+ * session daemon and not emptying the registry would cause an assertion
+ * to hit.
+ */
+ trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
+ consumer_data.chunk_registry);
+ if (trace_chunks_left) {
+ ERR("%u trace chunks are leaked by lttng-consumerd. "
+ "This can be caused by an internal error of the session daemon.",
+ trace_chunks_left);
+ }
+ /* Run all callbacks freeing each chunk. */
+ rcu_barrier();
+ lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
}
/*
* core function for writing trace buffers to either the local filesystem or
* the network.
*
- * It must be called with the stream lock held.
+ * It must be called with the stream and the channel lock held.
*
* Careful review MUST be put if any changes occur!
*
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ assert(stream->net_seq_idx != (uint64_t) -1ULL ||
+ stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
- if (ret < 0) {
- ERR("Rotating output file");
+ ret = consumer_stream_rotate_output_files(stream);
+ if (ret) {
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = lttng_index_file_create(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current,
- CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
- if (!stream->index_file) {
- goto end;
- }
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
relayd_hang_up = 1;
/* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
+ if (errno == EPIPE) {
/*
* This is possible if the fd is closed on the other side
* (outfd) or any write problem. It can be verbose a bit for a
if (stream->chan->tracefile_size > 0 &&
(stream->tracefile_size_current + len) >
stream->chan->tracefile_size) {
- ret = utils_rotate_stream_file(stream->chan->pathname,
- stream->name, stream->chan->tracefile_size,
- stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current),
- &stream->out_fd);
+ ret = consumer_stream_rotate_output_files(stream);
if (ret < 0) {
written = ret;
- ERR("Rotating output file");
goto end;
}
outfd = stream->out_fd;
-
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = lttng_index_file_create(stream->chan->pathname,
- stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
- stream->tracefile_count_current,
- CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
- if (!stream->index_file) {
- goto end;
- }
- }
-
- /* Reset current size because we just perform a rotation. */
- stream->tracefile_size_current = 0;
- stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
- struct lttng_consumer_channel *free_chan = NULL;
+ struct lttng_consumer_channel *channel = NULL;
+ bool free_channel = false;
assert(stream);
/*
DBG3("Consumer delete metadata stream %d", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&stream->chan->lock);
+ /*
+ * Note that this assumes that a stream's channel is never changed and
+ * that the stream's lock doesn't need to be taken to sample its
+ * channel.
+ */
+ channel = stream->chan;
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&stream->lock);
- if (stream->chan->metadata_cache) {
+ if (channel->metadata_cache) {
/* Only applicable to userspace consumers. */
- pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ pthread_mutex_lock(&channel->metadata_cache->lock);
}
/* Remove any reference to that stream. */
consumer_stream_destroy_buffers(stream);
/* Atomically decrement channel refcount since other threads can use it. */
- if (!uatomic_sub_return(&stream->chan->refcount, 1)
- && !uatomic_read(&stream->chan->nb_init_stream_left)) {
+ if (!uatomic_sub_return(&channel->refcount, 1)
+ && !uatomic_read(&channel->nb_init_stream_left)) {
/* Go for channel deletion! */
- free_chan = stream->chan;
+ free_channel = true;
}
+ stream->chan = NULL;
/*
* Nullify the stream reference so it is not used after deletion. The
* channel lock MUST be acquired before being able to check for a NULL
* pointer value.
*/
- stream->chan->metadata_stream = NULL;
+ channel->metadata_stream = NULL;
- if (stream->chan->metadata_cache) {
- pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ if (channel->metadata_cache) {
+ pthread_mutex_unlock(&channel->metadata_cache->lock);
}
pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (free_channel) {
+ consumer_del_channel(channel);
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
consumer_stream_free(stream);
}
rcu_read_unlock();
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- ssize_t ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("Failed to write to the channel rotation pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- ret = 0;
- }
-
- return (int) ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
-{
- int ret = 0;
-
- pthread_mutex_lock(&stream->chan->lock);
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- /*
- * The ust_metadata_pushed counter has been reset to 0, so now
- * we can wakeup the metadata thread so it dumps the metadata
- * cache to the new file.
- */
- if (stream->metadata_flag) {
- consumer_metadata_wakeup_pipe(stream->chan);
- }
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
- stream->chan->name);
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
- pthread_mutex_unlock(&stream->chan->lock);
-
- return ret;
-}
-
/*
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & LPOLLIN) {
ssize_t pipe_len;
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
if (pollfd == ctx->consumer_channel_pipe[0]) {
if (revents & LPOLLIN) {
enum consumer_channel_action action;
struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
- int rotate_ret;
- bool rotated = false;
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
pthread_mutex_lock(&stream->metadata_rdv_lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
break;
default:
ERR("Unknown consumer_data type");
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
- if (rotated) {
- rotate_ret = consumer_post_rotation(stream, ctx);
- if (rotate_ret < 0) {
- ERR("Failed after a rotation");
- ret = -1;
- }
- }
+ pthread_mutex_unlock(&stream->chan->lock);
return ret;
}
goto error;
}
+ consumer_data.channels_by_session_id_ht =
+ lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.channels_by_session_id_ht) {
+ goto error;
+ }
+
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!consumer_data.relayd_ht) {
goto error;
goto error;
}
+ consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
+ if (!consumer_data.chunk_registry) {
+ goto error;
+ }
+
return 0;
error:
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
- fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
- fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /*
+ * We gave the ownership of the fd to the relayd structure. Set the
+ * fd to -1 so we don't call close() on it in the error path below.
+ */
+ fd = -1;
/* We successfully added the socket. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
- relayd = find_relayd_by_session_id(id);
- if (relayd) {
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_begin_data_pending(&relayd->control_sock,
- relayd->relayd_session_id);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- /* Communication error thus the relayd so no data pending. */
- ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- goto data_not_pending;
- }
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
}
}
- /* Relayd check */
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ if (ret < 0) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
- if (ret < 0) {
+
+ if (ret == 1) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ goto data_pending;
+ } else if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- pthread_mutex_unlock(&stream->lock);
goto data_not_pending;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
- goto data_pending;
- }
}
- pthread_mutex_unlock(&stream->lock);
- }
- if (relayd) {
- unsigned int is_data_inflight = 0;
-
- /* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ /* Send end command for data pending. */
ret = relayd_end_data_pending(&relayd->control_sock,
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
* is already at the rotate position (produced == consumed), we flag it as
* ready for rotation. The rotation of ready streams occurs after we have
* replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
- uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, uint64_t relayd_id, uint32_t metadata,
struct lttng_consumer_local_data *ctx)
{
int ret;
- struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
- rcu_read_lock();
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("No channel found for key %" PRIu64, key);
- ret = -1;
- goto end;
- }
+ rcu_read_lock();
pthread_mutex_lock(&channel->lock);
- channel->current_chunk_id = new_chunk_id;
-
- ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname));
- if (ret) {
- ERR("Failed to copy new path to channel during channel rotation");
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
goto end_unlock_channel;
}
- if (relayd_id == -1ULL) {
- /*
- * The domain path (/ust or /kernel) has been created before, we
- * now need to create the last part of the path: the application/user
- * specific section (uid/1000/64-bit).
- */
- ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG,
- channel->uid, channel->gid);
- if (ret < 0) {
- ERR("Failed to create trace directory at %s during rotation",
- channel->pathname);
- ret = -1;
- goto end_unlock_channel;
- }
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
*/
pthread_mutex_lock(&stream->lock);
- ret = lttng_strncpy(stream->channel_read_only_attributes.path,
- channel->pathname,
- sizeof(stream->channel_read_only_attributes.path));
- if (ret) {
- ERR("Failed to sample channel path name during channel rotation");
- goto end_unlock_stream;
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
}
+
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
ERR("Failed to sample snapshot position during channel rotation");
if (consumed_pos == stream->rotate_position) {
stream->rotate_ready = true;
}
- channel->nr_stream_rotate_pending++;
+ /*
+ * Active flush; has no effect if the production position
+ * is at a packet boundary.
+ */
ret = consumer_flush_buffer(stream, 1);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
goto end_unlock_stream;
}
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk.
+ *
+ * At the moment when the positions of the buffers are
+ * sampled, the production position does not necessarily
+ * sit at a packet boundary. The 'active' flush
+ * operation above will push the production position to
+ * the next packet boundary _if_ it is not already
+ * sitting at such a boundary.
+ *
+ * Assuming a current production position that is not
+ * on the bound of a packet, the 'target' sequence
+ * number is
+ * (consumed_pos / subbuffer_size) + 1
+ * Note the '+ 1' to ensure the current packet is
+ * part of the current trace chunk.
+ *
+ * However, if the production position is already at
+ * a packet boundary, the '+ 1' is not necessary as the
+ * last packet of the current chunk is already
+ * 'complete'.
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
+ !!(stream->rotate_position % stream->max_sb_size),
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
pthread_mutex_unlock(&stream->lock);
}
+ stream = NULL;
pthread_mutex_unlock(&channel->lock);
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end;
+ }
+
ret = 0;
goto end;
pthread_mutex_unlock(&channel->lock);
end:
rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
return ret;
}
/*
* Perform the rotation a local stream file.
*/
+static
int rotate_local_stream(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream)
{
- int ret;
+ int ret = 0;
- DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s",
+ DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
stream->key,
- stream->chan->key,
- stream->channel_read_only_attributes.path);
-
- ret = close(stream->out_fd);
- if (ret < 0) {
- PERROR("Closing trace file (fd %d), stream %" PRIu64,
- stream->out_fd, stream->key);
- assert(0);
- goto error;
- }
-
- ret = utils_create_stream_file(
- stream->channel_read_only_attributes.path,
- stream->name,
- stream->channel_read_only_attributes.tracefile_size,
- stream->tracefile_count_current,
- stream->uid, stream->gid, NULL);
- if (ret < 0) {
- ERR("Rotate create stream file");
- goto error;
- }
- stream->out_fd = ret;
+ stream->chan->key);
stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = 0;
- if (!stream->metadata_flag) {
- struct lttng_index_file *index_file;
-
- lttng_index_file_put(stream->index_file);
-
- index_file = lttng_index_file_create(
- stream->channel_read_only_attributes.path,
- stream->name, stream->uid, stream->gid,
- stream->channel_read_only_attributes.tracefile_size,
- stream->tracefile_count_current,
- CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
- if (!index_file) {
- ERR("Create index file during rotation");
- goto error;
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("Failed to close stream out_fd of channel \"%s\"",
+ stream->chan->name);
}
- stream->index_file = index_file;
- stream->out_fd_offset = 0;
+ stream->out_fd = -1;
}
- ret = 0;
- goto end;
-
-error:
- ret = -1;
-end:
- return ret;
-
-}
-
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
-{
- int ret;
- struct consumer_relayd_sock_pair *relayd;
-
- DBG("Rotate relay stream");
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- ERR("Failed to find relayd");
- ret = -1;
- goto end;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_stream(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->channel_read_only_attributes.path,
- stream->chan->current_chunk_id,
- stream->last_sequence_number);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- }
- if (ret) {
- ERR("Rotate relay stream");
+ if (!stream->trace_chunk) {
+ goto end;
}
+ ret = consumer_stream_create_output_files(stream, true);
end:
return ret;
}
/*
* Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream lock held.
+ * It must be called with the channel and stream locks held.
*
* Return 0 on success, a negative number of error.
*/
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, bool *rotated)
+ struct lttng_consumer_stream *stream)
{
int ret;
DBG("Consumer rotate stream %" PRIu64, stream->key);
- if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = rotate_relay_stream(ctx, stream);
- } else {
- ret = rotate_local_stream(ctx, stream);
- }
- if (ret < 0) {
- ERR("Rotate stream");
+ /*
+ * Update the stream's 'current' chunk to the session's (channel)
+ * now-current chunk.
+ */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ if (stream->chan->trace_chunk == stream->trace_chunk) {
+ /*
+ * A channel can be rotated and not have a "next" chunk
+ * to transition to. In that case, the channel's "current chunk"
+ * has not been closed yet, but it has not been updated to
+ * a "next" trace chunk either. Hence, the stream, like its
+ * parent channel, becomes part of no chunk and can't output
+ * anything until a new trace chunk is created.
+ */
+ stream->trace_chunk = NULL;
+ } else if (stream->chan->trace_chunk &&
+ !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
+ ret = -1;
goto error;
+ } else {
+ /*
+ * Update the stream's trace chunk to its parent channel's
+ * current trace chunk.
+ */
+ stream->trace_chunk = stream->chan->trace_chunk;
}
- if (stream->metadata_flag) {
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- /*
- * Reset the position of what has been read from the metadata
- * cache to 0 so we can dump it again.
- */
- ret = kernctl_metadata_cache_dump(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to dump the kernel metadata cache after rotation");
- goto error;
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- /*
- * Reset the position pushed from the metadata cache so it
- * will write from the beginning on the next push.
- */
- stream->ust_metadata_pushed = 0;
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = rotate_local_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
}
}
- lttng_consumer_reset_stream_rotate_state(stream);
- if (rotated) {
- *rotated = true;
+ if (stream->metadata_flag && stream->trace_chunk) {
+ /*
+ * If the stream has transitioned to a new trace
+ * chunk, the metadata should be re-dumped to the
+ * newest chunk.
+ *
+ * However, it is possible for a stream to transition to
+ * a "no-chunk" state. This can happen if a rotation
+ * occurs on an inactive session. In such cases, the metadata
+ * regeneration will happen when the next trace chunk is
+ * created.
+ */
+ ret = consumer_metadata_stream_dump(stream);
+ if (ret) {
+ goto error;
+ }
}
+ lttng_consumer_reset_stream_rotate_state(stream);
ret = 0;
* This is especially important for low throughput streams that have already
* been consumed, we cannot wait for their next packet to perform the
* rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_rotate_ready_streams(uint64_t key,
- struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
{
int ret;
- struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("No channel found for key %" PRIu64, key);
- ret = -1;
- goto end;
- }
-
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
health_code_update();
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
if (!stream->rotate_ready) {
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
continue;
}
DBG("Consumer rotate ready stream %" PRIu64, stream->key);
- ret = lttng_consumer_rotate_stream(ctx, stream, NULL);
+ ret = lttng_consumer_rotate_stream(ctx, stream);
pthread_mutex_unlock(&stream->lock);
- if (ret) {
- goto end;
- }
-
- ret = consumer_post_rotation(stream, ctx);
+ pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
goto end;
}
return ret;
}
-static
-int rotate_rename_local(const char *old_path, const char *new_path,
- uid_t uid, gid_t gid)
+enum lttcomm_return_code lttng_consumer_init_command(
+ struct lttng_consumer_local_data *ctx,
+ const lttng_uuid sessiond_uuid)
{
- int ret;
-
- assert(old_path);
- assert(new_path);
+ enum lttcomm_return_code ret;
+ char uuid_str[UUID_STR_LEN];
- ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid);
- if (ret < 0) {
- ERR("Create directory on rotate");
- goto end;
- }
-
- ret = rename(old_path, new_path);
- if (ret < 0 && errno != ENOENT) {
- PERROR("Rename completed rotation chunk");
+ if (ctx->sessiond_uuid.is_set) {
+ ret = LTTCOMM_CONSUMERD_ALREADY_SET;
goto end;
}
- ret = 0;
+ ctx->sessiond_uuid.is_set = true;
+ memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ DBG("Received session daemon UUID: %s", uuid_str);
end:
return ret;
}
-static
-int rotate_rename_relay(const char *old_path, const char *new_path,
- uint64_t relayd_id)
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
{
int ret;
- struct consumer_relayd_sock_pair *relayd;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *created_chunk, *published_chunk;
+ enum lttng_trace_chunk_status chunk_status;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ char creation_timestamp_buffer[ISO8601_STR_LEN];
+ const char *relayd_id_str = "(none)";
+ const char *creation_timestamp_str;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
- relayd = consumer_find_relayd(relayd_id);
- if (!relayd) {
- ERR("Failed to find relayd while running rotate_rename_relay command");
- ret = -1;
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto end;
}
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
- if (ret < 0) {
- ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-end:
- return ret;
-}
-int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
- uid_t uid, gid_t gid, uint64_t relayd_id)
-{
- if (relayd_id != -1ULL) {
- return rotate_rename_relay(old_path, new_path, relayd_id);
- } else {
- return rotate_rename_local(old_path, new_path, uid, gid);
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
}
-}
-
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
- uint64_t relayd_id, uint64_t chunk_id)
-{
- int ret;
- struct consumer_relayd_sock_pair *relayd;
- relayd = consumer_find_relayd(relayd_id);
- if (!relayd) {
- ERR("Failed to find relayd");
- ret = -1;
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto end;
}
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
- if (ret < 0) {
- ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
+ consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp, NULL,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL, path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+error:
+ rcu_read_unlock();
+ /* Release the reference returned by the "publish" operation. */
+ lttng_trace_chunk_put(published_chunk);
end:
- return ret;
+ return ret_code;
}
-static
-int mkdir_local(const char *path, uid_t uid, gid_t gid)
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
{
- int ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+ enum lttng_trace_chunk_status chunk_status;
- ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid);
- if (ret < 0) {
- /* utils_mkdir_recursive logs an error. */
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
+
+ DBG("Consumer close trace chunk command: relayd_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
+ ERR("Failed to find chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
goto end;
}
- ret = 0;
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
+ chunk_close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ /*
+ * chunk is now invalid to access as we no longer hold a reference to
+ * it; it is only kept around to compare it (by address) to the
+ * current chunk found in the session's channels.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
+ channel, node.node) {
+ int ret;
+
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
+ }
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk,
+ path);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
end:
- return ret;
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
+ return ret_code;
}
-static
-int mkdir_relay(const char *path, uint64_t relayd_id)
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id)
{
int ret;
- struct consumer_relayd_sock_pair *relayd;
+ enum lttcomm_return_code ret_code;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_remote;
- relayd = consumer_find_relayd(relayd_id);
- if (!relayd) {
- ERR("Failed to find relayd");
- ret = -1;
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ DBG("Consumer trace chunk exists command: relayd_id = %s"
+ ", chunk_id = %" PRIu64, relayd_id_str,
+ chunk_id);
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id,
+ chunk_id);
+ DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
+ if (chunk) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ lttng_trace_chunk_put(chunk);
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
goto end;
}
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_mkdir(&relayd->control_sock, path);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
}
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-end:
- return ret;
-
-}
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
-int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
- uint64_t relayd_id)
-{
- if (relayd_id != -1ULL) {
- return mkdir_relay(path, relayd_id);
- } else {
- return mkdir_local(path, uid, gid);
- }
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret_code;
}