From f5a2cd657eb3a81a8788438d796e906148a1b3b1 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Wed, 12 Sep 2018 11:55:57 -0400 Subject: [PATCH 01/16] fd-tracker Fix: error path lead to null pointer dereference of handle Upstream status: pending review and upstream merge of the fd-tracker feature. Signed-off-by: Jonathan Rajotte --- src/common/fd-tracker/fd-tracker.c | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/common/fd-tracker/fd-tracker.c b/src/common/fd-tracker/fd-tracker.c index 8af38c29c..2cf26f723 100644 --- a/src/common/fd-tracker/fd-tracker.c +++ b/src/common/fd-tracker/fd-tracker.c @@ -500,18 +500,17 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, if (tracker->count.suspendable.active > 0) { ret = fd_tracker_suspend_handles(tracker, 1); if (ret) { - goto error_destroy; + goto end; } } else { /* * There are not enough active suspendable file - * descriptors to open a new fd and still accomodate the - * tracker's capacity. + * descriptors to open a new fd and still accommodate + * the tracker's capacity. */ WARN("Cannot open file system handle, too many unsuspendable file descriptors are opened (%u)", tracker->count.unsuspendable); - ret = -EMFILE; - goto error_destroy; + goto end; } } @@ -524,15 +523,13 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, ret = pthread_mutex_init(&handle->lock, NULL); if (ret) { PERROR("Failed to initialize handle mutex while creating fs handle"); - free(handle); - goto error_free; + goto error_mutex_init; } handle->fd = open_from_properties(path, &properties); if (handle->fd < 0) { PERROR("Failed to open fs handle to %s, open() returned", path); - ret = -errno; - goto error_destroy; + goto error; } handle->properties = properties; @@ -542,28 +539,26 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, if (!handle->inode) { ERR("Failed to get lttng_inode corresponding to file %s", path); - goto error_destroy; + goto error; } if (fstat(handle->fd, &fd_stat)) { PERROR("Failed to retrieve file descriptor inode while creating fs handle, fstat() returned"); - ret = -errno; - goto error_destroy; + goto error; } handle->ino = fd_stat.st_ino; fd_tracker_track(tracker, handle); - pthread_mutex_unlock(&tracker->lock); end: + pthread_mutex_unlock(&tracker->lock); return handle; -error_destroy: - pthread_mutex_destroy(&handle->lock); -error_free: +error: if (handle->inode) { lttng_inode_put(handle->inode); } + pthread_mutex_destroy(&handle->lock); +error_mutex_init: free(handle); - pthread_mutex_unlock(&tracker->lock); handle = NULL; goto end; } -- 2.34.1 From 9e3d760f2bf927be2c59d0ccb3b64e2663ae7263 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Sun, 5 Aug 2018 21:38:10 -0400 Subject: [PATCH 02/16] fd-tracker Fix: do not warn on index file not found Upstream status pending on fd-tracker merge Signed-off-by: Jonathan Rajotte --- src/bin/lttng-relayd/index-file.c | 10 ++++++++-- src/common/fd-tracker/fd-tracker.c | 18 +++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/bin/lttng-relayd/index-file.c b/src/bin/lttng-relayd/index-file.c index 04dac4068..4a69f39bc 100644 --- a/src/bin/lttng-relayd/index-file.c +++ b/src/bin/lttng-relayd/index-file.c @@ -65,8 +65,9 @@ int unlink_through_handle(const char *path) DBG("Unlinking index at %s through a filesystem handle", path); handle = fd_tracker_open_fs_handle(the_fd_tracker, path, flags, NULL); if (!handle) { - /* There is nothing to do. */ - DBG("File %s does not exist, ignoring unlink", path); + if (errno == ENOENT) { + DBG("File %s does not exist, ignoring unlink", path); + } goto end; } @@ -151,12 +152,14 @@ struct relay_index_file *relay_index_file_create(const char *path_name, fs_handle = fd_tracker_open_fs_handle(the_fd_tracker, idx_file_path, flags, &mode); if (!fs_handle) { + PERROR("Failed to open index file at %s", idx_file_path); goto error; } index_file->handle = fs_handle; fd = fs_handle_get_fd(fs_handle); if (fd < 0) { + PERROR("Failed to get fd of index file at %s", idx_file_path); goto error; } @@ -302,6 +305,7 @@ int relay_index_file_write(const struct relay_index_file *index_file, fd = fs_handle_get_fd(index_file->handle); if (fd < 0) { + PERROR("Failed to get fd from handle"); ret = fd; goto end; } @@ -329,6 +333,7 @@ int relay_index_file_read(const struct relay_index_file *index_file, fd = fs_handle_get_fd(index_file->handle); if (fd < 0) { + PERROR("Failed to get fd of handle %p", index_file->handle); ret = fd; goto end; } @@ -352,6 +357,7 @@ int relay_index_file_seek_end(struct relay_index_file *index_file) fd = fs_handle_get_fd(index_file->handle); if (fd < 0) { + PERROR("Failed to get fd of handle %p", index_file->handle); ret = fd; goto end; } diff --git a/src/common/fd-tracker/fd-tracker.c b/src/common/fd-tracker/fd-tracker.c index 2cf26f723..eddf5ada6 100644 --- a/src/common/fd-tracker/fd-tracker.c +++ b/src/common/fd-tracker/fd-tracker.c @@ -305,9 +305,9 @@ int fs_handle_restore(struct fs_handle *handle) ret = open_from_properties(path, &handle->properties); if (ret < 0) { + errno = -ret; PERROR("Failed to restore filesystem handle to %s, open() failed", path); - ret = -errno; goto end; } fd = ret; @@ -483,10 +483,13 @@ end: return ret; } +/* + * If return NULL check errno for error. + */ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, const char *path, int flags, mode_t *mode) { - int ret; + int ret = 0; struct fs_handle *handle = NULL; struct stat fd_stat; struct open_properties properties = { @@ -500,6 +503,8 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, if (tracker->count.suspendable.active > 0) { ret = fd_tracker_suspend_handles(tracker, 1); if (ret) { + ERR("Suspend handled failed"); + ret = EMFILE; goto end; } } else { @@ -510,12 +515,14 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, */ WARN("Cannot open file system handle, too many unsuspendable file descriptors are opened (%u)", tracker->count.unsuspendable); + ret = EMFILE; goto end; } } handle = zmalloc(sizeof(*handle)); if (!handle) { + ret = ENOMEM; goto end; } handle->tracker = tracker; @@ -523,12 +530,14 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, ret = pthread_mutex_init(&handle->lock, NULL); if (ret) { PERROR("Failed to initialize handle mutex while creating fs handle"); + ret = errno; goto error_mutex_init; } handle->fd = open_from_properties(path, &properties); if (handle->fd < 0) { - PERROR("Failed to open fs handle to %s, open() returned", path); + /* ret contains -errno on error. */ + ret = -ret; goto error; } @@ -537,6 +546,7 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, handle->inode = lttng_inode_registry_get_inode(tracker->inode_registry, handle->fd, path); if (!handle->inode) { + ret = errno; ERR("Failed to get lttng_inode corresponding to file %s", path); goto error; @@ -544,6 +554,7 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, if (fstat(handle->fd, &fd_stat)) { PERROR("Failed to retrieve file descriptor inode while creating fs handle, fstat() returned"); + ret = errno; goto error; } handle->ino = fd_stat.st_ino; @@ -551,6 +562,7 @@ struct fs_handle *fd_tracker_open_fs_handle(struct fd_tracker *tracker, fd_tracker_track(tracker, handle); end: pthread_mutex_unlock(&tracker->lock); + errno = ret; return handle; error: if (handle->inode) { -- 2.34.1 From 86a047c71886157d4e4c292f295b09ae70391097 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Sun, 22 Jul 2018 23:38:34 -0400 Subject: [PATCH 03/16] Perform local data pending check then relayd Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer.c | 61 ++++++++++++++++------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 7b87a85fe..482d4a63b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3604,21 +3604,6 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, @@ -3641,9 +3626,29 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + /* Communication error thus the relayd so no data pending. */ + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3652,27 +3657,19 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - if (ret < 0) { - ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); + if (ret == 1) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret == 1) { + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); pthread_mutex_unlock(&stream->lock); - goto data_pending; + goto data_not_pending; } } - pthread_mutex_unlock(&stream->lock); - } - - if (relayd) { - unsigned int is_data_inflight = 0; - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); -- 2.34.1 From 80db18da0437527e6cf3cc9e96f713a7ac21ae00 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 20 Jul 2018 18:41:49 -0400 Subject: [PATCH 04/16] Set consumer's verbosity to the max level on --verbose-consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The consumer's verbosity is set to '1' when --verbose-consumer is used when launching the session daemon. This means that all DBG2/3() statements are ignored. This commit always sets the consumer's verbosity to the maximal level. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-consumerd/lttng-consumerd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 50e2f4648..78316056d 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -252,7 +252,7 @@ static int parse_args(int argc, char **argv) lttng_opt_quiet = 1; break; case 'v': - lttng_opt_verbose = 1; + lttng_opt_verbose = 3; break; case 'V': fprintf(stdout, "%s\n", VERSION); -- 2.34.1 From 9a951630b590d14966e19b8d54f078fe174c1f47 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 28 Aug 2018 21:19:53 -0400 Subject: [PATCH 05/16] Teardown relayd on communication error during data pending Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 482d4a63b..292bd3938 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3638,8 +3638,8 @@ int consumer_data_pending(uint64_t id) ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id); if (ret < 0) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); /* Communication error thus the relayd so no data pending. */ + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; @@ -3663,6 +3663,8 @@ int consumer_data_pending(uint64_t id) goto data_pending; } if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); pthread_mutex_unlock(&stream->lock); goto data_not_pending; -- 2.34.1 From ac8d0a59d7cb787ef38a18c03f385b8be14f4d86 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Fri, 21 Sep 2018 04:57:16 -0400 Subject: [PATCH 06/16] EfficiOS backport 2.9 revision 6 Signed-off-by: Jonathan Rajotte --- version/extra_version_description | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/extra_version_description b/version/extra_version_description index 97ee0a857..2cfc12a2e 100644 --- a/version/extra_version_description +++ b/version/extra_version_description @@ -1 +1 @@ -EfficiOS Revision 5 +EfficiOS Revision 6 -- 2.34.1 From 028ba70793261607f5e08f412326c843b45c1984 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 25 Jul 2017 16:26:25 -0400 Subject: [PATCH 07/16] Cleanup: remove dead assignment MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Both calling sites do not use the return value and errors are already managed inside the called function. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- src/common/consumer/consumer.c | 6 ++---- src/common/consumer/consumer.h | 2 +- src/common/kernel-consumer/kernel-consumer.c | 4 ++-- src/common/ust-consumer/ust-consumer.c | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 292bd3938..b99a3f581 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3355,7 +3355,7 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ -int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3516,7 +3516,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, add_relayd(relayd); /* All good! */ - return 0; + return; error: if (consumer_send_status_msg(sock, ret_code) < 0) { @@ -3534,8 +3534,6 @@ error_nosignal: if (relayd_created) { free(relayd); } - - return ret; } /* diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 54b919d28..07197c598 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -719,7 +719,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); -int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, +void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, uint64_t relayd_session_id); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index ccca69e0e..e87410366 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -459,10 +459,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { /* Session daemon status message are handled in the following call. */ - ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index baa81d450..4277372af 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1368,7 +1368,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { /* Session daemon status message are handled in the following call. */ - ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, msg.u.relayd_sock.relayd_session_id); -- 2.34.1 From 6d40f8fa6e8256c448ab3b3d279069a8749c716d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 25 Jul 2018 15:26:37 -0400 Subject: [PATCH 08/16] consumer: Rename net_seq_idx to relayd_id MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The consumer's streams refer to a 'net_seq_idx' of which the meaning must have been lost in the sands of time. It is a unique identifier of a given relay daemon. Hence, renaming it to 'relayd_id' appears sensible. Signed-off-by: Jérémie Galarneau Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer-stream.c | 14 ++-- src/common/consumer/consumer.c | 83 ++++++++++---------- src/common/consumer/consumer.h | 13 +-- src/common/kernel-consumer/kernel-consumer.c | 18 ++--- src/common/ust-consumer/ust-consumer.c | 20 ++--- 5 files changed, 75 insertions(+), 73 deletions(-) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index f5ca6b71e..b2a5d41fd 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -72,7 +72,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->relayd_stream_id, stream->next_net_seq_num - 1); if (ret < 0) { - ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->id); lttng_consumer_cleanup_relayd(relayd); } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -82,7 +82,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } - stream->net_seq_idx = (uint64_t) -1ULL; + stream->relayd_id = (uint64_t) -1ULL; stream->sent_to_relayd = 0; } @@ -166,7 +166,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) /* Check and cleanup relayd if needed. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd != NULL) { consumer_stream_relayd_close(stream, relayd); } @@ -360,9 +360,9 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, assert(element); rcu_read_lock(); - if (stream->net_seq_idx != (uint64_t) -1ULL) { + if (stream->relayd_id != (uint64_t) -1ULL) { struct consumer_relayd_sock_pair *relayd; - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_index(&relayd->control_sock, element, @@ -372,14 +372,14 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, * Communication error with lttng-relayd, * perform cleanup now */ - ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->id); lttng_consumer_cleanup_relayd(relayd); ret = -1; } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", - stream->key, stream->net_seq_idx); + stream->key, stream->relayd_id); ret = -1; } } else { diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index b99a3f581..b03bd39a9 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -417,25 +417,24 @@ static void cleanup_relayd_ht(void) } /* - * Update the end point status of all streams having the given network sequence - * index (relayd index). + * Update the end point status of all streams having the given relayd id. * * It's atomically set without having the stream mutex locked which is fine * because we handle the write/read race with a pipe wakeup for each thread. */ -static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, +static void update_endpoint_status_by_netidx(uint64_t relayd_id, enum consumer_endpoint_status status) { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); + DBG("Consumer set delete flag on stream by idx %" PRIu64, relayd_id); rcu_read_lock(); /* Let's begin with metadata */ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } @@ -443,7 +442,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, /* Follow up by the data streams */ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { - if (stream->net_seq_idx == net_seq_idx) { + if (stream->relayd_id == relayd_id) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to data stream %d", stream->wait_fd); } @@ -465,10 +464,10 @@ void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) assert(relayd); - DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->id); /* Save the net sequence index before destroying the object */ - netidx = relayd->net_seq_idx; + netidx = relayd->id; /* * Delete the relayd from the relayd hash table, close the sockets and free @@ -565,7 +564,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->state = state; stream->uid = uid; stream->gid = gid; - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; @@ -604,7 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64, stream->name, stream->key, channel_key, - stream->net_seq_idx, stream->session_id); + stream->relayd_id, stream->session_id); rcu_read_unlock(); return stream; @@ -697,7 +696,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) assert(relayd); lttng_ht_lookup(consumer_data.relayd_ht, - &relayd->net_seq_idx, &iter); + &relayd->id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { goto end; @@ -712,12 +711,12 @@ end: * Allocate and return a consumer relayd socket. */ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( - uint64_t net_seq_idx) + uint64_t relayd_id) { struct consumer_relayd_sock_pair *obj = NULL; /* net sequence index of -1 is a failure */ - if (net_seq_idx == (uint64_t) -1ULL) { + if (relayd_id == (uint64_t) -1ULL) { goto error; } @@ -727,12 +726,12 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( goto error; } - obj->net_seq_idx = net_seq_idx; + obj->id = relayd_id; obj->refcount = 0; obj->destroy_flag = 0; obj->control_sock.sock.fd = -1; obj->data_sock.sock.fd = -1; - lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx); + lttng_ht_node_init_u64(&obj->node, obj->id); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); error: @@ -780,12 +779,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, struct consumer_relayd_sock_pair *relayd; assert(stream); - assert(stream->net_seq_idx != -1ULL); + assert(stream->relayd_id != -1ULL); assert(path); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -794,7 +793,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -803,13 +802,13 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->sent_to_relayd = 1; } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", - stream->key, stream->net_seq_idx); + stream->key, stream->relayd_id); ret = -1; goto end; } DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, - stream->name, stream->key, stream->net_seq_idx); + stream->name, stream->key, stream->relayd_id); end: rcu_read_unlock(); @@ -821,35 +820,35 @@ end: * * Returns 0 on success, < 0 on error */ -int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +int consumer_send_relayd_streams_sent(uint64_t relayd_id) { int ret = 0; struct consumer_relayd_sock_pair *relayd; - assert(net_seq_idx != -1ULL); + assert(relayd_id != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); goto end; } } else { ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", - net_seq_idx); + relayd_id); ret = -1; goto end; } ret = 0; - DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + DBG("All streams sent relayd id %" PRIu64, relayd_id); end: rcu_read_unlock(); @@ -865,7 +864,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); + relayd = consumer_find_relayd(stream->relayd_id); if (relayd) { consumer_stream_relayd_close(stream, relayd); } @@ -1525,8 +1524,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { ret = -EPIPE; goto end; @@ -1704,7 +1703,7 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); } @@ -1757,8 +1756,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( rcu_read_lock(); /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - relayd = consumer_find_relayd(stream->net_seq_idx); + if (stream->relayd_id != (uint64_t) -1ULL) { + relayd = consumer_find_relayd(stream->relayd_id); if (relayd == NULL) { written = -ret; goto end; @@ -1931,7 +1930,7 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; @@ -3355,7 +3354,7 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, + void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, @@ -3368,14 +3367,14 @@ error: assert(ctx); assert(relayd_sock); - DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); + DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", relayd_id); /* Get relayd reference if exists. */ - relayd = consumer_find_relayd(net_seq_idx); + relayd = consumer_find_relayd(relayd_id); if (relayd == NULL) { assert(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ - relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); + relayd = consumer_allocate_relayd_sock_pair(relayd_id); if (relayd == NULL) { ret = -ENOMEM; ret_code = LTTCOMM_CONSUMERD_ENOMEM; @@ -3498,7 +3497,7 @@ error: DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", - relayd->net_seq_idx, fd); + relayd->id, fd); /* We successfully added the socket. Send status back. */ ret = consumer_send_status_msg(sock, ret_code); @@ -3547,7 +3546,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd = NULL; - /* Iterate over all relayd since they are indexed by net_seq_idx. */ + /* Iterate over all relayd since they are indexed by relayd_id. */ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { /* @@ -3638,7 +3637,7 @@ int consumer_data_pending(uint64_t id) if (ret < 0) { /* Communication error thus the relayd so no data pending. */ pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } @@ -3661,7 +3660,7 @@ int consumer_data_pending(uint64_t id) goto data_pending; } if (ret < 0) { - ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); pthread_mutex_unlock(&stream->lock); @@ -3674,7 +3673,7 @@ int consumer_data_pending(uint64_t id) relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 07197c598..ab59e6e75 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -302,8 +302,11 @@ struct lttng_consumer_stream { /* UID/GID of the user owning the session to which stream belongs */ uid_t uid; gid_t gid; - /* Network sequence number. Indicating on which relayd socket it goes. */ - uint64_t net_seq_idx; + /* + * Relayd id, indicating on which relayd socket it goes. Set to -1ULL if + * not the stream is not associated to a relay daemon. + */ + uint64_t relayd_id; /* * Indicate if this stream was successfully sent to a relayd. This is set * after the refcount of the relayd is incremented and is checked when the @@ -421,7 +424,7 @@ struct lttng_consumer_stream { */ struct consumer_relayd_sock_pair { /* Network sequence number. */ - uint64_t net_seq_idx; + uint64_t id; /* Number of stream associated with this relayd */ int refcount; @@ -679,7 +682,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path); -int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); +int consumer_send_relayd_streams_sent(uint64_t relayd_id); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, @@ -719,7 +722,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); -void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, +void consumer_add_relayd_socket(uint64_t relayd_id, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, uint64_t relayd_session_id); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index e87410366..0a232dcfc 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -154,7 +154,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, * Assign the received relayd ID so we can use it for streaming. The streams * are not visible to anyone so this is OK to change it. */ - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(stream, path); @@ -305,7 +305,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } else { close_relayd_stream(stream); - stream->net_seq_idx = (uint64_t) -1ULL; + stream->relayd_id = (uint64_t) -1ULL; } pthread_mutex_unlock(&stream->lock); } @@ -396,7 +396,7 @@ static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, if (use_relayd) { close_relayd_stream(metadata_stream); - metadata_stream->net_seq_idx = (uint64_t) -1ULL; + metadata_stream->relayd_id = (uint64_t) -1ULL; } else { if (metadata_stream->out_fd >= 0) { ret = close(metadata_stream->out_fd); @@ -701,13 +701,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel->monitor) { DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, - new_stream->net_seq_idx); + new_stream->relayd_id); cds_list_add(&new_stream->send_node, &channel->streams.head); break; } /* Send stream to relayd if the stream has an ID. */ - if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + if (new_stream->relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(new_stream, new_stream->chan->pathname); if (ret < 0) { @@ -722,7 +722,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ if (channel->streams_sent_to_relayd) { ret = consumer_send_relayd_streams_sent( - new_stream->net_seq_idx); + new_stream->relayd_id); if (ret < 0) { goto end_nosignal; } @@ -1419,8 +1419,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * network streaming or the full padding (len) size when we are _not_ * streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || + (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since this is possible to @@ -1498,7 +1498,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * Don't create anything if this is set for streaming or should not be * monitored. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4277372af..4d0b530a7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -553,7 +553,7 @@ static int send_sessiond_channel(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; - uint64_t net_seq_idx = -1ULL; + uint64_t relayd_id = -1ULL; assert(channel); assert(ctx); @@ -578,8 +578,8 @@ static int send_sessiond_channel(int sock, } ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } - if (net_seq_idx == -1ULL) { - net_seq_idx = stream->net_seq_idx; + if (relayd_id == -1ULL) { + relayd_id = stream->relayd_id; } } } @@ -939,7 +939,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) } /* Send metadata stream to relayd if needed. */ - if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { + if (metadata->metadata_stream->relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { @@ -947,7 +947,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) goto error; } ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + metadata->metadata_stream->relayd_id); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -1039,7 +1039,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, assert(metadata_stream); if (relayd_id != (uint64_t) -1ULL) { - metadata_stream->net_seq_idx = relayd_id; + metadata_stream->relayd_id = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { goto error_stream; @@ -1116,7 +1116,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, /* Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); - stream->net_seq_idx = relayd_id; + stream->relayd_id = relayd_id; if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); @@ -2545,8 +2545,8 @@ retry: * The mmap operation should write subbuf_size amount of data when network * streaming or the full padding (len) size when we are _not_ streaming. */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + if ((ret != subbuf_size && stream->relayd_id != (uint64_t) -1ULL) || + (ret != len && stream->relayd_id == (uint64_t) -1ULL)) { /* * Display the error but continue processing to try to release the * subbuffer. This is a DBG statement since any unexpected kill or @@ -2627,7 +2627,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + if (stream->relayd_id == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); -- 2.34.1 From 1121820b6fd5a7e560ee3a027b93aedc4636ac53 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Wed, 25 Jul 2018 15:32:10 -0400 Subject: [PATCH 09/16] Cleanup: missing line in consumer-stream.c MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer-stream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index b2a5d41fd..97ba627dc 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -362,6 +362,7 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, rcu_read_lock(); if (stream->relayd_id != (uint64_t) -1ULL) { struct consumer_relayd_sock_pair *relayd; + relayd = consumer_find_relayd(stream->relayd_id); if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); -- 2.34.1 From 697a8d862195953e455812a57b34785980eaaeb2 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Mon, 27 Aug 2018 14:29:10 -0400 Subject: [PATCH 10/16] Explicit data pending reason consumer side Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index b03bd39a9..22c2b9be5 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3618,6 +3618,7 @@ int consumer_data_pending(uint64_t id) /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { + DBG("Data is pending locally on stream %" PRIu64, stream->key); pthread_mutex_unlock(&stream->lock); goto data_pending; } @@ -3678,6 +3679,7 @@ int consumer_data_pending(uint64_t id) goto data_not_pending; } if (is_data_inflight) { + DBG("Data is in flight on relayd %" PRIu64, relayd->id); goto data_pending; } } -- 2.34.1 From 2ec8403bb9647a2ad22dcfc8bbc79db15b179783 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 8 Jan 2019 16:25:15 -0500 Subject: [PATCH 11/16] Remove unnecessary mutex unlock Signed-off-by: Jonathan Rajotte --- src/common/consumer/consumer.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 22c2b9be5..0ed22ac31 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3657,14 +3657,12 @@ int consumer_data_pending(uint64_t id) } if (ret == 1) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - pthread_mutex_unlock(&stream->lock); goto data_pending; } if (ret < 0) { ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->id); lttng_consumer_cleanup_relayd(relayd); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - pthread_mutex_unlock(&stream->lock); goto data_not_pending; } } -- 2.34.1 From d9b9088d65b39e019f0d663df9fc5383408d60a3 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Mon, 11 Feb 2019 14:43:54 -0500 Subject: [PATCH 12/16] relayd: do not prioritize control events over data. Simplify the algorithm used by relayd for control and data connections handling. Use the notion of activity phase. An activity phase represent a phase for which all connections with activity (poll/epoll) are not yet processed. When an active connection is processed, her activity phase is set to the current activity phase to prevent further progress during the same activity phase. Once all active connections (poll events) have been processed during the current activity phase, the current activity phase is incremented. This give fairness across all connections during a given activity phase. This can also serve as a base for future work toward resources based prioritizing. Signed-off-by: Jonathan Rajotte --- src/bin/lttng-relayd/connection.c | 1 + src/bin/lttng-relayd/connection.h | 5 + src/bin/lttng-relayd/main.c | 196 +++++++++--------------------- 3 files changed, 65 insertions(+), 137 deletions(-) diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 6a2c27ff6..6d6107eb6 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -122,6 +122,7 @@ struct relay_connection *connection_create(struct lttcomm_sock *sock, if (conn->type == RELAY_CONTROL) { lttng_dynamic_buffer_init(&conn->protocol.ctrl.reception_buffer); } + conn->activity_phase = 0; connection_reset_protocol_state(conn); end: return conn; diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h index d0edf8214..d189e1549 100644 --- a/src/bin/lttng-relayd/connection.h +++ b/src/bin/lttng-relayd/connection.h @@ -138,6 +138,11 @@ struct relay_connection { struct lttng_dynamic_buffer reception_buffer; } ctrl; } protocol; + /* + * The activity phase for which the connection was last active. + * This is used to ensure fairness across connections. + */ + uint64_t activity_phase; }; struct relay_connection *connection_create(struct lttcomm_sock *sock, diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 7f091c2b7..19b1342ac 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -3058,12 +3058,14 @@ static void relay_thread_close_connection(struct lttng_poll_event *events, */ static void *relay_thread_worker(void *data) { - int ret, err = -1, last_seen_data_fd = -1; + int ret, err = -1; uint32_t nb_fd; struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; struct lttng_ht_iter iter; - struct relay_connection *destroy_conn = NULL; + struct relay_connection *tmp_conn = NULL; + uint64_t relay_conn_pipe_activity_phase = 0; + uint64_t current_activity_phase = 1; DBG("[thread] Relay worker started"); @@ -3095,7 +3097,8 @@ static void *relay_thread_worker(void *data) restart: while (1) { - int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + int i; + bool at_least_one_event_processed = false; health_code_update(); @@ -3117,9 +3120,11 @@ restart: nb_fd = ret; /* - * Process control. The control connection is - * prioritized so we don't starve it with high - * throughput tracing data on the data connection. + * Processes FDs that were not yet active during the + * current activity phase. Increment the activity phase when all + * events are already processed for the current activity + * phase. This result in fairness across all connections and + * connection pipe. */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ @@ -3148,14 +3153,32 @@ restart: if (revents & LPOLLIN) { struct relay_connection *conn; + if (relay_conn_pipe_activity_phase == current_activity_phase) { + /* + * Only consider once per + * activity phase for fairness. + */ + DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase); + continue; + } + ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); if (ret < 0) { goto error; } + /* + * For now we prefer fairness over + * "immediate" action of new + * connection. Set activity phase to the + * current phase. + */ + conn->activity_phase = current_activity_phase; lttng_poll_add(&events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + relay_conn_pipe_activity_phase = current_activity_phase; + at_least_one_event_processed = true; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Relay connection pipe error"); goto error; @@ -3164,29 +3187,27 @@ restart: goto error; } } else { - struct relay_connection *ctrl_conn; + struct relay_connection *connection; - ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd); + connection = connection_get_by_sock(relay_connections_ht, pollfd); /* If not found, there is a synchronization issue. */ - assert(ctrl_conn); - - if (ctrl_conn->type == RELAY_DATA) { - if (revents & LPOLLIN) { - /* - * Flag the last seen data fd not deleted. It will be - * used as the last seen fd if any fd gets deleted in - * this first loop. - */ - last_notdel_data_fd = pollfd; - } - goto put_ctrl_connection; + assert(connection); + if (connection->activity_phase == current_activity_phase) { + DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase); + goto connection_put; } - assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { enum relay_connection_status status; - status = relay_process_control(ctrl_conn); + at_least_one_event_processed = true; + connection->activity_phase = current_activity_phase; + + if (connection->type == RELAY_DATA) { + status = relay_process_data(connection); + } else { + status = relay_process_control(connection); + } if (status != RELAY_CONNECTION_STATUS_OK) { /* * On socket error flag the session as aborted to force @@ -3199,137 +3220,38 @@ restart: * connection is closed (uncleanly) before the packet's * data provided. * - * Since the control connection encountered an error, + * Since the connection encountered an error, * it is okay to be conservative and close the * session right now as we can't rely on the protocol * being respected anymore. */ if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(ctrl_conn->session); + session_abort(connection->session); } /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, - ctrl_conn); + connection); } - seen_control = 1; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { relay_thread_close_connection(&events, - pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } + pollfd, connection); } else { - ERR("Unexpected poll events %u for control sock %d", + ERR("Unexpected poll events %u for sock %d", revents, pollfd); - connection_put(ctrl_conn); + connection_put(connection); goto error; } - put_ctrl_connection: - connection_put(ctrl_conn); + connection_put: + connection_put(connection); } } - /* - * The last loop handled a control request, go back to poll to make - * sure we prioritise the control socket. - */ - if (seen_control) { - continue; - } - - if (last_seen_data_fd >= 0) { - for (i = 0; i < nb_fd; i++) { - int pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (last_seen_data_fd == pollfd) { - idx = i; - break; - } - } - } - - /* Process data connection. */ - for (i = idx + 1; i < nb_fd; i++) { - /* Fetch the poll data. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); - struct relay_connection *data_conn; - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Skip the command pipe. It's handled in the first loop. */ - if (pollfd == relay_conn_pipe[0]) { - continue; - } - - data_conn = connection_get_by_sock(relay_connections_ht, pollfd); - if (!data_conn) { - /* Skip it. Might be removed before. */ - continue; - } - if (data_conn->type == RELAY_CONTROL) { - goto put_data_connection; - } - assert(data_conn->type == RELAY_DATA); - - if (revents & LPOLLIN) { - enum relay_connection_status status; - - status = relay_process_data(data_conn); - /* Connection closed or error. */ - if (status != RELAY_CONNECTION_STATUS_OK) { - /* - * On socket error flag the session as aborted to force - * the cleanup of its stream otherwise it can leak - * during the lifetime of the relayd. - * - * This prevents situations in which streams can be - * left opened because an index was received, the - * control connection is closed, and the data - * connection is closed (uncleanly) before the packet's - * data provided. - * - * Since the data connection encountered an error, - * it is okay to be conservative and close the - * session right now as we can't rely on the protocol - * being respected anymore. - */ - if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(data_conn->session); - } - relay_thread_close_connection(&events, pollfd, - data_conn); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - connection_put(data_conn); - goto restart; - } - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, - data_conn); - } else { - ERR("Unknown poll events %u for data sock %d", - revents, pollfd); - } - put_data_connection: - connection_put(data_conn); + if (!at_least_one_event_processed) { + current_activity_phase++; + DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase); } - last_seen_data_fd = -1; } /* Normal exit, no error */ @@ -3340,18 +3262,18 @@ error: /* Cleanup reamaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, - destroy_conn, + tmp_conn, sock_n.node) { health_code_update(); - session_abort(destroy_conn->session); + session_abort(tmp_conn->session); /* * No need to grab another ref, because we own - * destroy_conn. + * tmp conn. */ - relay_thread_close_connection(&events, destroy_conn->sock->fd, - destroy_conn); + relay_thread_close_connection(&events, tmp_conn->sock->fd, + tmp_conn); } rcu_read_unlock(); -- 2.34.1 From aa95faca6cb7a9af8ef2edf696a296b0eac6704c Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Thu, 31 Jan 2019 16:54:32 -0500 Subject: [PATCH 13/16] Bound maximum data read to RECV_DATA_BUFFER_SIZE per iteration Do not consume everything all at once even if there is data left on the socket. This is to provide fairness to the overall data handling of all connections. It also provide a bounded processing execution for a data processing iteration. Signed-off-by: Jonathan Rajotte --- src/bin/lttng-relayd/main.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 19b1342ac..1da631487 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2834,7 +2834,6 @@ static enum relay_connection_status relay_process_data_receive_payload( &conn->protocol.data.state.receive_payload; const size_t chunk_size = RECV_DATA_BUFFER_SIZE; char data_buffer[chunk_size]; - bool partial_recv = false; bool new_stream = false, close_requested = false; uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; @@ -2875,7 +2874,7 @@ static enum relay_connection_status relay_process_data_receive_payload( * - the data immediately available on the socket, * - the on-stack data buffer */ - while (left_to_receive > 0 && !partial_recv) { + if (left_to_receive > 0) { ssize_t write_ret; size_t recv_size = min(left_to_receive, chunk_size); @@ -2892,13 +2891,7 @@ static enum relay_connection_status relay_process_data_receive_payload( DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, state->header.stream_id); status = RELAY_CONNECTION_STATUS_CLOSED; - break; - } else if (ret < (int) recv_size) { - /* - * All the data available on the socket has been - * consumed. - */ - partial_recv = true; + goto data_left_to_process; } recv_size = ret; @@ -2920,6 +2913,8 @@ static enum relay_connection_status relay_process_data_receive_payload( write_ret, stream->stream_handle); } +data_left_to_process: + if (state->left_to_receive > 0) { /* * Did not receive all the data expected, wait for more data to -- 2.34.1 From 9cc0ed0d6d8ed76fd453adb1dd4276e94ee8590a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 13 Nov 2018 12:12:20 -0500 Subject: [PATCH 14/16] Fix: Connect timeout arithmetic in inet/inet6 (v4) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The nanoseconds part of the timespec struct time_a is not always bigger than time_b since it wraps around each second. Use 64-bit arithmetic to compute the difference. Merge/move duplicated code into utils.c. This function is really doing two things. Split it into timespec_to_ms() and timespec_abs_diff(). Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/common/common.h | 1 + src/common/sessiond-comm/inet.c | 27 +++++++----------------- src/common/sessiond-comm/inet6.c | 27 +++++++----------------- src/common/time.h | 15 +++++++++++++ src/common/utils.c | 36 ++++++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/common/common.h b/src/common/common.h index 41eb03613..9168a2458 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -23,5 +23,6 @@ #include "macros.h" #include "runas.h" #include "readwrite.h" +#include "time.h" #endif /* _COMMON_H */ diff --git a/src/common/sessiond-comm/inet.c b/src/common/sessiond-comm/inet.c index f70fc93a0..0192d83fa 100644 --- a/src/common/sessiond-comm/inet.c +++ b/src/common/sessiond-comm/inet.c @@ -118,31 +118,13 @@ int connect_no_timeout(struct lttcomm_sock *sock) sizeof(sock->sockaddr.addr.sin)); } -/* - * Return time_a - time_b in milliseconds. - */ -static -unsigned long time_diff_ms(struct timespec *time_a, - struct timespec *time_b) -{ - time_t sec_diff; - long nsec_diff; - unsigned long result_ms; - - sec_diff = time_a->tv_sec - time_b->tv_sec; - nsec_diff = time_a->tv_nsec - time_b->tv_nsec; - - result_ms = sec_diff * MSEC_PER_SEC; - result_ms += nsec_diff / NSEC_PER_MSEC; - return result_ms; -} - static int connect_with_timeout(struct lttcomm_sock *sock) { unsigned long timeout = lttcomm_get_network_timeout(); int ret, flags, connect_ret; struct timespec orig_time, cur_time; + unsigned long diff_ms; ret = fcntl(sock->fd, F_GETFL, 0); if (ret == -1) { @@ -223,7 +205,12 @@ int connect_with_timeout(struct lttcomm_sock *sock) connect_ret = ret; goto error; } - } while (time_diff_ms(&cur_time, &orig_time) < timeout); + if (timespec_to_ms(timespec_abs_diff(cur_time, orig_time), &diff_ms) < 0) { + ERR("timespec_to_ms input overflows milliseconds output"); + connect_ret = -1; + goto error; + } + } while (diff_ms < timeout); /* Timeout */ errno = ETIMEDOUT; diff --git a/src/common/sessiond-comm/inet6.c b/src/common/sessiond-comm/inet6.c index a041038eb..0071e6767 100644 --- a/src/common/sessiond-comm/inet6.c +++ b/src/common/sessiond-comm/inet6.c @@ -116,31 +116,13 @@ int connect_no_timeout(struct lttcomm_sock *sock) sizeof(sock->sockaddr.addr.sin6)); } -/* - * Return time_a - time_b in milliseconds. - */ -static -unsigned long time_diff_ms(struct timespec *time_a, - struct timespec *time_b) -{ - time_t sec_diff; - long nsec_diff; - unsigned long result_ms; - - sec_diff = time_a->tv_sec - time_b->tv_sec; - nsec_diff = time_a->tv_nsec - time_b->tv_nsec; - - result_ms = sec_diff * MSEC_PER_SEC; - result_ms += nsec_diff / NSEC_PER_MSEC; - return result_ms; -} - static int connect_with_timeout(struct lttcomm_sock *sock) { unsigned long timeout = lttcomm_get_network_timeout(); int ret, flags, connect_ret; struct timespec orig_time, cur_time; + unsigned long diff_ms; ret = fcntl(sock->fd, F_GETFL, 0); if (ret == -1) { @@ -222,7 +204,12 @@ int connect_with_timeout(struct lttcomm_sock *sock) connect_ret = ret; goto error; } - } while (time_diff_ms(&cur_time, &orig_time) < timeout); + if (timespec_to_ms(timespec_abs_diff(cur_time, orig_time), &diff_ms) < 0) { + ERR("timespec_to_ms input overflows milliseconds output"); + connect_ret = -1; + goto error; + } + } while (diff_ms < timeout); /* Timeout */ errno = ETIMEDOUT; diff --git a/src/common/time.h b/src/common/time.h index 81770779b..8a7dd958f 100644 --- a/src/common/time.h +++ b/src/common/time.h @@ -19,9 +19,24 @@ #ifndef LTTNG_TIME_H #define LTTNG_TIME_H +#include + #define MSEC_PER_SEC 1000ULL #define NSEC_PER_SEC 1000000000ULL #define NSEC_PER_MSEC 1000000ULL #define NSEC_PER_USEC 1000ULL +/* + * timespec_to_ms: Convert timespec to milliseconds. + * + * Returns 0 on success, else -1 on error. errno is set to EOVERFLOW if + * input would overflow the output in milliseconds. + */ +int timespec_to_ms(struct timespec ts, unsigned long *ms); + +/* + * timespec_abs_diff: Absolute difference between timespec. + */ +struct timespec timespec_abs_diff(struct timespec ts_a, struct timespec ts_b); + #endif /* LTTNG_TIME_H */ diff --git a/src/common/utils.c b/src/common/utils.c index 5ce597f5e..e68218393 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -41,6 +41,7 @@ #include "utils.h" #include "defaults.h" +#include "time.h" /* * Return a partial realpath(3) of the path even if the full path does not @@ -1497,3 +1498,38 @@ int utils_change_working_dir(const char *path) end: return ret; } + +LTTNG_HIDDEN +int timespec_to_ms(struct timespec ts, unsigned long *ms) +{ + unsigned long res, remain_ms; + + if (ts.tv_sec > ULONG_MAX / MSEC_PER_SEC) { + errno = EOVERFLOW; + return -1; /* multiplication overflow */ + } + res = ts.tv_sec * MSEC_PER_SEC; + remain_ms = ULONG_MAX - res; + if (ts.tv_nsec / NSEC_PER_MSEC > remain_ms) { + errno = EOVERFLOW; + return -1; /* addition overflow */ + } + res += ts.tv_nsec / NSEC_PER_MSEC; + *ms = res; + return 0; +} + +LTTNG_HIDDEN +struct timespec timespec_abs_diff(struct timespec t1, struct timespec t2) +{ + uint64_t ts1 = (uint64_t) t1.tv_sec * (uint64_t) NSEC_PER_SEC + + (uint64_t) t1.tv_nsec; + uint64_t ts2 = (uint64_t) t2.tv_sec * (uint64_t) NSEC_PER_SEC + + (uint64_t) t2.tv_nsec; + uint64_t diff = max(ts1, ts2) - min(ts1, ts2); + struct timespec res; + + res.tv_sec = diff / (uint64_t) NSEC_PER_SEC; + res.tv_nsec = diff % (uint64_t) NSEC_PER_SEC; + return res; +} -- 2.34.1 From 33a7d8c5ca99a1bbd1c2b915f9b8a37203ae072a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 13 Nov 2018 12:12:21 -0500 Subject: [PATCH 15/16] Fix: max_t/min_t macros are missing cast on input MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The semantic expected from max_t and min_t is to perform the max/min comparison in the type provided as first parameter. Cast the input parameters to the proper type before comparing them, rather than after. There is no more need to cast the result of the expression now that both inputs are cast to the right type. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/common/macros.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/macros.h b/src/common/macros.h index 90849ed30..9637a3830 100644 --- a/src/common/macros.h +++ b/src/common/macros.h @@ -63,7 +63,7 @@ void *zmalloc(size_t len) #endif #ifndef max_t -#define max_t(type, a, b) ((type) max(a, b)) +#define max_t(type, a, b) max((type) a, (type) b) #endif #ifndef min -- 2.34.1 From 824d194d2c6982a5624f06bc36ed8f99a1afd478 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 25 Jun 2019 11:18:10 -0400 Subject: [PATCH 16/16] EfficiOS backport 2.9 revision 7 Signed-off-by: Jonathan Rajotte --- version/extra_version_description | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/extra_version_description b/version/extra_version_description index 2cfc12a2e..ca25f9aba 100644 --- a/version/extra_version_description +++ b/version/extra_version_description @@ -1 +1 @@ -EfficiOS Revision 6 +EfficiOS Revision 7 -- 2.34.1