From 7d2f74525fbda4dcc744f33ea26c911545b5df13 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Fri, 27 Sep 2013 15:46:13 -0400 Subject: [PATCH] Fix: don't create index on snapshot Signed-off-by: David Goulet --- src/bin/lttng-relayd/cmd-2-4.c | 1 + src/bin/lttng-relayd/lttng-relayd.h | 10 ++ src/bin/lttng-relayd/main.c | 168 +++++++++++++++------------- src/bin/lttng-sessiond/consumer.c | 3 +- src/bin/lttng-sessiond/consumer.h | 3 + src/bin/lttng-sessiond/snapshot.c | 1 + src/common/relayd/relayd.c | 11 +- src/common/relayd/relayd.h | 3 +- src/common/sessiond-comm/relayd.h | 1 + 9 files changed, 119 insertions(+), 82 deletions(-) diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c index 45410e4e8..f199b8396 100644 --- a/src/bin/lttng-relayd/cmd-2-4.c +++ b/src/bin/lttng-relayd/cmd-2-4.c @@ -46,6 +46,7 @@ int cmd_create_session_2_4(struct relay_command *cmd, strncpy(session->hostname, session_info.hostname, sizeof(session->hostname)); session->live_timer = be32toh(session_info.live_timer); + session->snapshot = be32toh(session_info.snapshot); ret = 0; diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index da2324034..3c8c26c29 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -62,6 +62,16 @@ struct relay_session { struct rcu_head rcu_node; uint32_t viewer_attached; uint32_t stream_count; + /* Tell if this session is for a snapshot or not. */ + unsigned int snapshot:1; + + /* + * Indicate version protocol for this session. This is especially useful + * for the data thread that has no idea which version it operates on since + * linking control/data sockets is non trivial. + */ + uint64_t minor; + uint64_t major; }; /* diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 59888c280..dffcaff98 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -904,6 +904,8 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->id = ++last_relay_session_id; session->sock = cmd->sock; + session->minor = cmd->minor; + session->major = cmd->major; cmd->session = session; reply.session_id = htobe64(session->id); @@ -1832,17 +1834,98 @@ end: return ret; } +/* + * Handle index for a data stream. + * + * RCU read side lock MUST be acquired. + * + * Return 0 on success else a negative value. + */ +static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, + int rotate_index) +{ + int ret = 0, index_created = 0; + uint64_t stream_id, data_offset; + struct relay_index *index, *wr_index = NULL; + + assert(stream); + + stream_id = stream->stream_handle; + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream_id, net_seq_num); + if (!index) { + ret = -1; + goto error; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto error; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + ret = relay_index_write(wr_index->fd, wr_index); + if (ret < 0) { + goto error; + } + stream->total_index_received++; + } + +error: + return ret; +} + /* * relay_process_data: Process the data received on the data socket */ static int relay_process_data(struct relay_command *cmd) { - int ret = 0, rotate_index = 0, index_created = 0; + int ret = 0, rotate_index = 0; struct relay_stream *stream; - struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id, data_offset; + uint64_t stream_id; uint64_t net_seq_num; uint32_t data_size; @@ -1915,77 +1998,18 @@ int relay_process_data(struct relay_command *cmd) rotate_index = 1; } - /* Get data offset because we are about to update the index. */ - data_offset = htobe64(stream->tracefile_size_current); - /* - * Lookup for an existing index for that stream id/sequence number. If on - * exists, the control thread already received the data for it thus we need - * to write it on disk. + * Index are handled in protocol version 2.4 and above. Also, snapshot and + * index are NOT supported. */ - index = relay_index_find(stream_id, net_seq_num); - if (!index) { - /* A successful creation will add the object to the HT. */ - index = relay_index_create(stream->stream_handle, net_seq_num); - if (!index) { - goto end_rcu_unlock; - } - index_created = 1; - } - - if (rotate_index || stream->index_fd < 0) { - index->to_close_fd = stream->index_fd; - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); + if (stream->session->minor >= 4 && !stream->session->snapshot) { + ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { - /* This will close the stream's index fd if one. */ - relay_index_free_safe(index); goto end_rcu_unlock; } - stream->index_fd = ret; - } - index->fd = stream->index_fd; - index->index_data.offset = data_offset; - - if (index_created) { - /* - * Try to add the relay index object to the hash table. If an object - * already exist, destroy back the index created and set the data. - */ - relay_index_add(index, &wr_index); - if (wr_index) { - /* Copy back data from the created index. */ - wr_index->fd = index->fd; - wr_index->to_close_fd = index->to_close_fd; - wr_index->index_data.offset = data_offset; - free(index); - } - } else { - /* The index already exists so write it on disk. */ - wr_index = index; - } - - /* Do we have a writable ready index to write on disk. */ - if (wr_index) { - /* Starting at 2.4, create the index file if none available. */ - if (cmd->minor >= 4 && stream->index_fd < 0) { - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - goto end_rcu_unlock; - } - stream->index_fd = ret; - } - - ret = relay_index_write(wr_index->fd, wr_index); - if (ret < 0) { - goto end_rcu_unlock; - } - stream->total_index_received++; } + /* Write data to stream output fd. */ do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); @@ -2350,15 +2374,9 @@ error: &iter, relay_connection, sessions_ht); } } -error_poll_create: - { - struct relay_index *index; - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - relay_index_delete(index); - } - lttng_ht_destroy(indexes_ht); - } rcu_read_unlock(); +error_poll_create: + lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index f0870e6b0..3a500092f 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -960,7 +960,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, if (type == LTTNG_STREAM_CONTROL) { ret = relayd_create_session(rsock, &msg.u.relayd_sock.relayd_session_id, - session_name, hostname, session_live_timer); + session_name, hostname, session_live_timer, + consumer->snapshot); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 4d6838f6a..484d8f7ba 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -163,6 +163,9 @@ struct consumer_output { */ struct lttng_ht *socks; + /* Tell if this output is used for snapshot. */ + unsigned int snapshot:1; + union { char trace_path[PATH_MAX]; struct consumer_net net; diff --git a/src/bin/lttng-sessiond/snapshot.c b/src/bin/lttng-sessiond/snapshot.c index d22b5d52c..6b1aa8db1 100644 --- a/src/bin/lttng-sessiond/snapshot.c +++ b/src/bin/lttng-sessiond/snapshot.c @@ -80,6 +80,7 @@ static int output_init(uint64_t max_size, const char *name, ret = -ENOMEM; goto error; } + output->consumer->snapshot = 1; /* No URL given. */ if (nb_uri == 0) { diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index a4c8a9261..1f427e0e2 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -122,7 +122,7 @@ error: */ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, uint64_t *session_id, char *session_name, char *hostname, - int session_live_timer) + int session_live_timer, unsigned int snapshot) { int ret; struct lttcomm_relayd_create_session_2_4 msg; @@ -130,6 +130,7 @@ static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, strncpy(msg.session_name, session_name, sizeof(msg.session_name)); strncpy(msg.hostname, hostname, sizeof(msg.hostname)); msg.live_timer = htobe32(session_live_timer); + msg.snapshot = htobe32(snapshot); /* Send command */ ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0); @@ -167,7 +168,8 @@ error: * a lttng error code from the relayd. */ int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id, - char *session_name, char *hostname, int session_live_timer) + char *session_name, char *hostname, int session_live_timer, + unsigned int snapshot) { int ret; struct lttcomm_relayd_status_session reply; @@ -184,9 +186,8 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_i ret = relayd_create_session_2_1(rsock, session_id); case 4: default: - ret = relayd_create_session_2_4(rsock, session_id, - session_name, hostname, - session_live_timer); + ret = relayd_create_session_2_4(rsock, session_id, session_name, + hostname, session_live_timer, snapshot); } if (ret < 0) { diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index d59f4ae2f..d12d7a495 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -26,7 +26,8 @@ int relayd_connect(struct lttcomm_relayd_sock *sock); int relayd_close(struct lttcomm_relayd_sock *sock); int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id, - char *session_name, char *hostname, int session_live_timer); + char *session_name, char *hostname, int session_live_timer, + unsigned int snapshot); int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index bc27b0ed4..24c4c6e84 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -171,6 +171,7 @@ struct lttcomm_relayd_create_session_2_4 { char session_name[NAME_MAX]; char hostname[HOST_NAME_MAX]; uint32_t live_timer; + uint32_t snapshot; } LTTNG_PACKED; #endif /* _RELAYD_COMM */ -- 2.34.1