X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=a179eeffd1f72f5cde0d1fcb013e70ae2ac4b7e2;hp=d79dee3cafd99c1232f2458a985561fb0e7a1875;hb=9bbfb88c5a3b6b581c81d2c9a9db14246f84675a;hpb=9363801e2d2069022a05e67066d8f527538946d0 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index d79dee3ca..a179eeffd 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1,5 +1,6 @@ /* * Copyright (C) 2012 - David Goulet + * 2018 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -15,7 +16,7 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -28,9 +29,10 @@ #include #include #include +#include #include "consumer.h" -#include "health.h" +#include "health-sessiond.h" #include "ust-app.h" #include "utils.h" @@ -62,8 +64,8 @@ int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len) /* The above call will print a PERROR on error. */ DBG("Error when sending data to consumer on sock %d", fd); /* - * At this point, the socket is not usable anymore thus flagging it - * invalid and closing it. + * At this point, the socket is not usable anymore thus closing it and + * setting the file descriptor to -1 so it is not reused. */ /* This call will PERROR on error. */ @@ -106,8 +108,8 @@ int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len) /* The above call will print a PERROR on error. */ DBG("Error when receiving data from the consumer socket %d", fd); /* - * At this point, the socket is not usable anymore thus flagging it - * invalid and closing it. + * At this point, the socket is not usable anymore thus closing it and + * setting the file descriptor to -1 so it is not reused. */ /* This call will PERROR on error. */ @@ -141,7 +143,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock) goto end; } - if (reply.ret_code == LTTNG_OK) { + if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) { /* All good. */ ret = 0; } else { @@ -177,13 +179,14 @@ int consumer_recv_status_channel(struct consumer_socket *sock, } /* An error is possible so don't touch the key and stream_count. */ - if (reply.ret_code != LTTNG_OK) { + if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) { ret = -1; goto end; } *key = reply.key; *stream_count = reply.stream_count; + ret = 0; end: return ret; @@ -205,6 +208,7 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr); + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD; msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index; @@ -469,6 +473,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type) output->enabled = 1; output->type = type; output->net_seq_index = (uint64_t) -1ULL; + urcu_ref_init(&output->ref); output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); @@ -503,11 +508,10 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) * * Should *NOT* be called with RCU read-side lock held. */ -void consumer_destroy_output(struct consumer_output *obj) +static void consumer_release_output(struct urcu_ref *ref) { - if (obj == NULL) { - return; - } + struct consumer_output *obj = + caa_container_of(ref, struct consumer_output, ref); consumer_destroy_output_sockets(obj); @@ -519,6 +523,27 @@ void consumer_destroy_output(struct consumer_output *obj) free(obj); } +/* + * Get the consumer_output object. + */ +void consumer_output_get(struct consumer_output *obj) +{ + urcu_ref_get(&obj->ref); +} + +/* + * Put the consumer_output object. + * + * Should *NOT* be called with RCU read-side lock held. + */ +void consumer_output_put(struct consumer_output *obj) +{ + if (!obj) { + return; + } + urcu_ref_put(&obj->ref, consumer_release_output); +} + /* * Copy consumer output and returned the newly allocated copy. * @@ -527,33 +552,30 @@ void consumer_destroy_output(struct consumer_output *obj) struct consumer_output *consumer_copy_output(struct consumer_output *obj) { int ret; - struct lttng_ht *tmp_ht_ptr; struct consumer_output *output; assert(obj); output = consumer_create_output(obj->type); if (output == NULL) { - goto error; + goto end; } - /* Avoid losing the HT reference after the memcpy() */ - tmp_ht_ptr = output->socks; - - memcpy(output, obj, sizeof(struct consumer_output)); - - /* Putting back the HT pointer and start copying socket(s). */ - output->socks = tmp_ht_ptr; - + output->enabled = obj->enabled; + output->net_seq_index = obj->net_seq_index; + memcpy(output->subdir, obj->subdir, sizeof(output->subdir)); + output->snapshot = obj->snapshot; + output->relay_major_version = obj->relay_major_version; + output->relay_minor_version = obj->relay_minor_version; + memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { - goto malloc_error; + goto error_put; } - -error: +end: return output; -malloc_error: - consumer_destroy_output(output); +error_put: + consumer_output_put(output); return NULL; } @@ -694,8 +716,12 @@ int consumer_set_network_uri(struct consumer_output *obj, goto error; } - strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); - DBG3("Consumer set network uri subdir path %s", tmp_path); + if (lttng_strncpy(obj->dst.net.base_dir, tmp_path, + sizeof(obj->dst.net.base_dir))) { + ret = -LTTNG_ERR_INVALID; + goto error; + } + DBG3("Consumer set network uri base_dir path %s", tmp_path); } return 0; @@ -707,14 +733,18 @@ error: /* * Send file descriptor to consumer via sock. + * + * The consumer socket lock must be held by the caller. */ -int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) +int consumer_send_fds(struct consumer_socket *sock, const int *fds, + size_t nb_fd) { int ret; assert(fds); assert(sock); assert(nb_fd > 0); + assert(pthread_mutex_trylock(sock->lock) == EBUSY); ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd); if (ret < 0) { @@ -724,13 +754,14 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) } ret = consumer_recv_status_reply(sock); - error: return ret; } /* * Consumer send communication message structure to consumer. + * + * The consumer socket lock must be held by the caller. */ int consumer_send_msg(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) @@ -739,6 +770,7 @@ int consumer_send_msg(struct consumer_socket *sock, assert(msg); assert(sock); + assert(pthread_mutex_trylock(sock->lock) == EBUSY); ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { @@ -753,6 +785,8 @@ error: /* * Consumer send channel communication message structure to consumer. + * + * The consumer socket lock must be held by the caller. */ int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) @@ -781,6 +815,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, int overwrite, unsigned int switch_timer_interval, unsigned int read_timer_interval, + unsigned int live_timer_interval, + unsigned int monitor_timer_interval, int output, int type, uint64_t session_id, @@ -796,7 +832,11 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - uint32_t ust_app_uid) + uint32_t ust_app_uid, + int64_t blocking_timeout, + const char *root_shm_path, + const char *shm_path, + uint64_t trace_archive_id) { assert(msg); @@ -809,6 +849,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.overwrite = overwrite; msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; + msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -822,6 +864,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.tracefile_count = tracefile_count; msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; + msg->u.ask_channel.blocking_timeout = blocking_timeout; + msg->u.ask_channel.trace_archive_id = trace_archive_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -833,13 +877,23 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; + + if (root_shm_path) { + strncpy(msg->u.ask_channel.root_shm_path, root_shm_path, + sizeof(msg->u.ask_channel.root_shm_path)); + msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = '\0'; + } + if (shm_path) { + strncpy(msg->u.ask_channel.shm_path, shm_path, + sizeof(msg->u.ask_channel.shm_path)); + msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0'; + } } /* * Init channel communication message structure. */ -void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, +void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t session_id, const char *pathname, @@ -852,7 +906,9 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int type, uint64_t tracefile_size, uint64_t tracefile_count, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval, + unsigned int monitor_timer_interval) { assert(msg); @@ -860,7 +916,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); /* Send channel */ - msg->cmd_type = cmd; + msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; msg->u.channel.channel_key = channel_key; msg->u.channel.session_id = session_id; msg->u.channel.uid = uid; @@ -872,6 +928,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_size = tracefile_size; msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; + msg->u.channel.live_timer_interval = live_timer_interval; + msg->u.channel.monitor_timer_interval = monitor_timer_interval; strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); @@ -884,20 +942,34 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, /* * Init stream communication message structure. */ -void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, +void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int cpu) + int32_t cpu, + uint64_t trace_archive_id) { assert(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - msg->cmd_type = cmd; + msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM; msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; msg->u.stream.cpu = cpu; + msg->u.stream.trace_archive_id = trace_archive_id; +} + +void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + uint64_t channel_key, uint64_t net_seq_idx) +{ + assert(msg); + + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + msg->cmd_type = cmd; + msg->u.sent_streams.channel_key = channel_key; + msg->u.sent_streams.net_seq_idx = net_seq_idx; } /* @@ -905,7 +977,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, */ int consumer_send_stream(struct consumer_socket *sock, struct consumer_output *dst, struct lttcomm_consumer_msg *msg, - int *fds, size_t nb_fd) + const int *fds, size_t nb_fd) { int ret; @@ -931,11 +1003,14 @@ error: /* * Send relayd socket to consumer associated with a session name. * + * The consumer socket lock must be held by the caller. + * * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id) + enum lttng_stream_type type, uint64_t session_id, + char *session_name, char *hostname, int session_live_timer) { int ret; struct lttcomm_consumer_msg msg; @@ -945,12 +1020,25 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, assert(consumer); assert(consumer_sock); + memset(&msg, 0, sizeof(msg)); /* Bail out if consumer is disabled */ if (!consumer->enabled) { ret = LTTNG_OK; goto error; } + if (type == LTTNG_STREAM_CONTROL) { + ret = relayd_create_session(rsock, + &msg.u.relayd_sock.relayd_session_id, + session_name, hostname, session_live_timer, + consumer->snapshot); + if (ret < 0) { + /* Close the control socket. */ + (void) relayd_close(rsock); + goto error; + } + } + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; /* * Assign network consumer output index using the temporary consumer since @@ -980,6 +1068,59 @@ error: return ret; } +static +int consumer_send_pipe(struct consumer_socket *consumer_sock, + enum lttng_consumer_command cmd, int pipe) +{ + int ret; + struct lttcomm_consumer_msg msg; + const char *pipe_name; + const char *command_name; + + switch (cmd) { + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + pipe_name = "channel monitor"; + command_name = "SET_CHANNEL_MONITOR_PIPE"; + break; + default: + ERR("Unexpected command received in %s (cmd = %d)", __func__, + (int) cmd); + abort(); + } + + /* Code flow error. Safety net. */ + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = cmd; + + pthread_mutex_lock(consumer_sock->lock); + DBG3("Sending %s command to consumer", command_name); + ret = consumer_send_msg(consumer_sock, &msg); + if (ret < 0) { + goto error; + } + + DBG3("Sending %s pipe %d to consumer on socket %d", + pipe_name, + pipe, *consumer_sock->fd_ptr); + ret = consumer_send_fds(consumer_sock, &pipe, 1); + if (ret < 0) { + goto error; + } + + DBG2("%s pipe successfully sent", pipe_name); +error: + pthread_mutex_unlock(consumer_sock->lock); + return ret; +} + +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) +{ + return consumer_send_pipe(consumer_sock, + LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); +} + /* * Set consumer subdirectory using the session name and a generated datetime if * needed. This is appended to the current subdirectory. @@ -1021,7 +1162,11 @@ int consumer_set_subdir(struct consumer_output *consumer, goto error; } - strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir)); + if (lttng_strncpy(consumer->subdir, tmp_path, + sizeof(consumer->subdir))) { + ret = -EINVAL; + goto error; + } DBG2("Consumer subdir set to %s", consumer->subdir); error: @@ -1029,11 +1174,8 @@ error: } /* - * Ask the consumer if the data is ready to read (NOT pending) for the specific - * session id. - * - * This function has a different behavior with the consumer i.e. that it waits - * for a reply from the consumer if yes or no the data is pending. + * Ask the consumer if the data is pending for the specific session id. + * Returns 1 if data is pending, 0 otherwise, or < 0 on error. */ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) @@ -1046,12 +1188,12 @@ int consumer_is_data_pending(uint64_t session_id, assert(consumer); - msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; + DBG3("Consumer data pending for id %" PRIu64, session_id); + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; msg.u.data_pending.session_id = session_id; - DBG3("Consumer data pending for id %" PRIu64, session_id); - /* Send command for each consumer */ rcu_read_lock(); cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, @@ -1104,6 +1246,7 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key) DBG2("Consumer flush channel key %" PRIu64, key); + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL; msg.u.flush_channel.key = key; @@ -1122,7 +1265,40 @@ end: } /* - * Send a close metdata command to consumer using the given channel key. + * Send a clear quiescent command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG2("Consumer clear quiescent channel key %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL; + msg.u.clear_quiescent_channel.key = key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + +/* + * Send a close metadata command to consumer using the given channel key. + * Called with registry lock held. * * Return 0 on success else a negative value. */ @@ -1136,6 +1312,7 @@ int consumer_close_metadata(struct consumer_socket *socket, DBG2("Consumer close metadata channel key %" PRIu64, metadata_key); + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA; msg.u.close_metadata.key = metadata_key; @@ -1168,6 +1345,7 @@ int consumer_setup_metadata(struct consumer_socket *socket, DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key); + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA; msg.u.setup_metadata.key = metadata_key; @@ -1186,13 +1364,14 @@ end: } /* - * Send metadata string to consumer. Socket lock MUST be acquired. + * Send metadata string to consumer. + * RCU read-side lock must be held to guarantee existence of socket. * * Return 0 on success else a negative value. */ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset) + size_t target_offset, uint64_t version) { int ret; struct lttcomm_consumer_msg msg; @@ -1201,10 +1380,14 @@ int consumer_push_metadata(struct consumer_socket *socket, DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr); + pthread_mutex_lock(socket->lock); + + memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA; msg.u.push_metadata.key = metadata_key; msg.u.push_metadata.target_offset = target_offset; msg.u.push_metadata.len = len; + msg.u.push_metadata.version = version; health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -1227,6 +1410,7 @@ int consumer_push_metadata(struct consumer_socket *socket, } end: + pthread_mutex_unlock(socket->lock); health_code_update(); return ret; } @@ -1238,7 +1422,8 @@ end: */ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, struct snapshot_output *output, int metadata, uid_t uid, gid_t gid, - const char *session_path, int wait, int max_stream_size) + const char *session_path, int wait, uint64_t nb_packets_per_stream, + uint64_t trace_archive_id) { int ret; struct lttcomm_consumer_msg msg; @@ -1252,47 +1437,459 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL; msg.u.snapshot_channel.key = key; - msg.u.snapshot_channel.max_stream_size = max_stream_size; + msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; + msg.u.snapshot_channel.trace_archive_id = trace_archive_id; if (output->consumer->type == CONSUMER_DST_NET) { msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index; msg.u.snapshot_channel.use_relayd = 1; ret = snprintf(msg.u.snapshot_channel.pathname, sizeof(msg.u.snapshot_channel.pathname), - "%s/%s-%s-%" PRIu64 "%s", output->consumer->subdir, - output->name, output->datetime, output->nb_snapshot, + "%s/%s/%s-%s-%" PRIu64 "%s", + output->consumer->dst.net.base_dir, + output->consumer->subdir, + output->name, output->datetime, + output->nb_snapshot, session_path); if (ret < 0) { ret = -LTTNG_ERR_NOMEM; goto error; + } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"", + sizeof(msg.u.snapshot_channel.pathname), + ret, output->consumer->dst.net.base_dir, + output->consumer->subdir, + output->name, output->datetime, + output->nb_snapshot, + session_path); + ret = -LTTNG_ERR_SNAPSHOT_FAIL; + goto error; } } else { ret = snprintf(msg.u.snapshot_channel.pathname, sizeof(msg.u.snapshot_channel.pathname), - "%s/%s-%s-%" PRIu64 "%s", output->consumer->dst.trace_path, - output->name, output->datetime, output->nb_snapshot, + "%s/%s-%s-%" PRIu64 "%s", + output->consumer->dst.session_root_path, + output->name, output->datetime, + output->nb_snapshot, session_path); if (ret < 0) { ret = -LTTNG_ERR_NOMEM; goto error; + } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"", + sizeof(msg.u.snapshot_channel.pathname), + ret, output->consumer->dst.session_root_path, + output->name, output->datetime, output->nb_snapshot, + session_path); + ret = -LTTNG_ERR_SNAPSHOT_FAIL; + goto error; } + msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; /* Create directory. Ignore if exist. */ ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; } } } + health_code_update(); + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND: + ret = -LTTNG_ERR_CHAN_NOT_FOUND; + break; + default: + ret = -LTTNG_ERR_SNAPSHOT_FAIL; + break; + } + goto error; + } + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer the number of discarded events for a channel. + */ +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer discarded events id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS; + msg.u.discarded_events.session_id = session_id; + msg.u.discarded_events.channel_key = channel_key; + + *discarded = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_discarded = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, + sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; + } + ret = 0; + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, + *discarded, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer the number of lost packets for a channel. + */ +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer lost packets id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS; + msg.u.lost_packets.session_id = session_id; + msg.u.lost_packets.channel_key = channel_key; + + *lost = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, + sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; + } + ret = 0; + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, + *lost, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer to rotate a channel. + * domain_path contains "/kernel" for kernel or the complete path for UST + * (ex: /ust/uid/1000/64-bit); + * + * The new_chunk_id is the session->rotate_count that has been incremented + * when the rotation started. On the relay, this allows to keep track in which + * chunk each stream is currently writing to (for the rotate_pending operation). + */ +int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, + uid_t uid, gid_t gid, struct consumer_output *output, + char *domain_path, bool is_metadata_channel, + uint64_t new_chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer rotate channel key %" PRIu64, key); + + pthread_mutex_lock(socket->lock); + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL; + msg.u.rotate_channel.key = key; + msg.u.rotate_channel.metadata = !!is_metadata_channel; + msg.u.rotate_channel.new_chunk_id = new_chunk_id; + + if (output->type == CONSUMER_DST_NET) { + msg.u.rotate_channel.relayd_id = output->net_seq_index; + ret = snprintf(msg.u.rotate_channel.pathname, + sizeof(msg.u.rotate_channel.pathname), "%s%s%s", + output->dst.net.base_dir, + output->chunk_path, domain_path); + if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) { + ERR("Failed to format channel path name when asking consumer to rotate channel"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + } else { + msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL; + ret = snprintf(msg.u.rotate_channel.pathname, + sizeof(msg.u.rotate_channel.pathname), "%s%s%s", + output->dst.session_root_path, + output->chunk_path, domain_path); + if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) { + ERR("Failed to format channel path name when asking consumer to rotate channel"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + } + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND: + ret = -LTTNG_ERR_CHAN_NOT_FOUND; + break; + default: + ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER; + break; + } + goto error; + } +error: + pthread_mutex_unlock(socket->lock); + health_code_update(); + return ret; +} + +int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, + const struct consumer_output *output, const char *old_path, + const char *new_path, uid_t uid, gid_t gid) +{ + int ret; + struct lttcomm_consumer_msg msg; + size_t old_path_length, new_path_length; + + assert(socket); + assert(old_path); + assert(new_path); + + DBG("Consumer rotate rename session %" PRIu64 ", old path = \"%s\", new_path = \"%s\"", + session_id, old_path, new_path); + + old_path_length = strlen(old_path); + if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) { + ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", + old_path_length + 1, sizeof(msg.u.rotate_rename.old_path)); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + new_path_length = strlen(new_path); + if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) { + ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", + new_path_length + 1, sizeof(msg.u.rotate_rename.new_path)); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME; + msg.u.rotate_rename.session_id = session_id; + msg.u.rotate_rename.uid = uid; + msg.u.rotate_rename.gid = gid; + strcpy(msg.u.rotate_rename.old_path, old_path); + strcpy(msg.u.rotate_rename.new_path, new_path); + + if (output->type == CONSUMER_DST_NET) { + msg.u.rotate_rename.relayd_id = output->net_seq_index; + } else { + msg.u.rotate_rename.relayd_id = -1ULL; + } + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER; + goto error; + } + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer if a rotation is locally pending. Must be called with the + * socket lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_check_rotation_pending_local(struct consumer_socket *socket, + uint64_t session_id, uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + uint32_t pending = 0; + + assert(socket); + + DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64, + session_id, chunk_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL; + msg.u.check_rotation_pending_local.session_id = session_id; + msg.u.check_rotation_pending_local.chunk_id = chunk_id; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER; + goto error; + } + + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (ret < 0) { + goto error; + } + + ret = pending; + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer if a rotation is pending on the relayd. Must be called with + * the socket lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_check_rotation_pending_relay(struct consumer_socket *socket, + const struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + uint32_t pending = 0; + + assert(socket); + + DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64, + session_id, chunk_id); + assert(output->type == CONSUMER_DST_NET); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY; + msg.u.check_rotation_pending_relay.session_id = session_id; + msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index; + msg.u.check_rotation_pending_relay.chunk_id = chunk_id; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER; + goto error; + } + + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (ret < 0) { + goto error; + } + + ret = pending; + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer to create a directory. + * + * Called with the consumer socket lock held. + */ +int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, + const struct consumer_output *output, const char *path, + uid_t uid, gid_t gid) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer mkdir %s in session %" PRIu64, path, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_MKDIR; + msg.u.mkdir.session_id = session_id; + msg.u.mkdir.uid = uid; + msg.u.mkdir.gid = gid; + ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path); + if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) { + ERR("Format path"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + if (output->type == CONSUMER_DST_NET) { + msg.u.mkdir.relayd_id = output->net_seq_index; + } else { + msg.u.mkdir.relayd_id = -1ULL; + } + health_code_update(); ret = consumer_send_msg(socket, &msg); if (ret < 0) { + ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER; goto error; }