X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=dffcaff9855a61b265bba6b3eb376a990188a76e;hp=59888c280c341cc1c16a5c89d652a88802787ec2;hb=7d2f74525fbda4dcc744f33ea26c911545b5df13;hpb=efc8a87fb1ec94af764008a13b3576e793ae288c diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 59888c280..dffcaff98 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -904,6 +904,8 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->id = ++last_relay_session_id; session->sock = cmd->sock; + session->minor = cmd->minor; + session->major = cmd->major; cmd->session = session; reply.session_id = htobe64(session->id); @@ -1832,17 +1834,98 @@ end: return ret; } +/* + * Handle index for a data stream. + * + * RCU read side lock MUST be acquired. + * + * Return 0 on success else a negative value. + */ +static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, + int rotate_index) +{ + int ret = 0, index_created = 0; + uint64_t stream_id, data_offset; + struct relay_index *index, *wr_index = NULL; + + assert(stream); + + stream_id = stream->stream_handle; + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream_id, net_seq_num); + if (!index) { + ret = -1; + goto error; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto error; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + ret = relay_index_write(wr_index->fd, wr_index); + if (ret < 0) { + goto error; + } + stream->total_index_received++; + } + +error: + return ret; +} + /* * relay_process_data: Process the data received on the data socket */ static int relay_process_data(struct relay_command *cmd) { - int ret = 0, rotate_index = 0, index_created = 0; + int ret = 0, rotate_index = 0; struct relay_stream *stream; - struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id, data_offset; + uint64_t stream_id; uint64_t net_seq_num; uint32_t data_size; @@ -1915,77 +1998,18 @@ int relay_process_data(struct relay_command *cmd) rotate_index = 1; } - /* Get data offset because we are about to update the index. */ - data_offset = htobe64(stream->tracefile_size_current); - /* - * Lookup for an existing index for that stream id/sequence number. If on - * exists, the control thread already received the data for it thus we need - * to write it on disk. + * Index are handled in protocol version 2.4 and above. Also, snapshot and + * index are NOT supported. */ - index = relay_index_find(stream_id, net_seq_num); - if (!index) { - /* A successful creation will add the object to the HT. */ - index = relay_index_create(stream->stream_handle, net_seq_num); - if (!index) { - goto end_rcu_unlock; - } - index_created = 1; - } - - if (rotate_index || stream->index_fd < 0) { - index->to_close_fd = stream->index_fd; - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); + if (stream->session->minor >= 4 && !stream->session->snapshot) { + ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { - /* This will close the stream's index fd if one. */ - relay_index_free_safe(index); goto end_rcu_unlock; } - stream->index_fd = ret; - } - index->fd = stream->index_fd; - index->index_data.offset = data_offset; - - if (index_created) { - /* - * Try to add the relay index object to the hash table. If an object - * already exist, destroy back the index created and set the data. - */ - relay_index_add(index, &wr_index); - if (wr_index) { - /* Copy back data from the created index. */ - wr_index->fd = index->fd; - wr_index->to_close_fd = index->to_close_fd; - wr_index->index_data.offset = data_offset; - free(index); - } - } else { - /* The index already exists so write it on disk. */ - wr_index = index; - } - - /* Do we have a writable ready index to write on disk. */ - if (wr_index) { - /* Starting at 2.4, create the index file if none available. */ - if (cmd->minor >= 4 && stream->index_fd < 0) { - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - goto end_rcu_unlock; - } - stream->index_fd = ret; - } - - ret = relay_index_write(wr_index->fd, wr_index); - if (ret < 0) { - goto end_rcu_unlock; - } - stream->total_index_received++; } + /* Write data to stream output fd. */ do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); @@ -2350,15 +2374,9 @@ error: &iter, relay_connection, sessions_ht); } } -error_poll_create: - { - struct relay_index *index; - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - relay_index_delete(index); - } - lttng_ht_destroy(indexes_ht); - } rcu_read_unlock(); +error_poll_create: + lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: