X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=52a4ed171d6b0dde5d1759c781f54ef45994cab0;hb=fe4477ee14abb348ce9e167f8b4c09312d67de36;hp=300fd2a2fc896108ac60c43ce920a5f633be111b;hpb=5d2e1e66a968d9e555f9b8b00d0589ebfaf3de32;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 300fd2a2f..52a4ed171 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -782,7 +783,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, } /* Metadata are always sent on the control socket. */ - outfd = relayd->control_sock.fd; + outfd = relayd->control_sock.sock.fd; } else { /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); @@ -807,7 +808,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, ++stream->next_net_seq_num; /* Set to go on data socket */ - outfd = relayd->data_sock.fd; + outfd = relayd->data_sock.sock.fd; } error: @@ -820,14 +821,16 @@ error: * * On error, return NULL. */ -struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key, +struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, const char *pathname, const char *name, uid_t uid, gid_t gid, int relayd_id, - enum lttng_event_output output) + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count) { struct lttng_consumer_channel *channel; @@ -844,6 +847,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key, channel->gid = gid; channel->relayd_id = relayd_id; channel->output = output; + channel->tracefile_size = tracefile_size; + channel->tracefile_count = tracefile_count; strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -876,8 +881,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, - &channel->key, &iter); + lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (node != NULL) { /* Channel already exist. Ignore the insertion */ @@ -936,7 +940,12 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { continue; } - DBG("Active FD %d", stream->wait_fd); + /* + * This clobbers way too much the debug output. Uncomment that if you + * need it for debugging purposes. + * + * DBG("Active FD %d", stream->wait_fd); + */ (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; local_stream[i] = stream; @@ -1137,6 +1146,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( } ctx->consumer_error_socket = -1; + ctx->consumer_metadata_socket = -1; /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; ctx->on_recv_channel = recv_channel; @@ -1223,6 +1233,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } + ret = close(ctx->consumer_metadata_socket); + if (ret) { + PERROR("close"); + } utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); utils_close_pipe(ctx->consumer_data_pipe); @@ -1324,6 +1338,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); + break; default: ERR("Unknown consumer_data type"); @@ -1379,6 +1394,24 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = utils_rotate_stream_file(stream->chan->pathname, + stream->name, stream->chan->tracefile_size, + stream->chan->tracefile_count, stream->uid, stream->gid, + stream->out_fd, &(stream->tracefile_count_current)); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd = ret; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -1541,6 +1574,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = utils_rotate_stream_file(stream->chan->pathname, + stream->name, stream->chan->tracefile_size, + stream->chan->tracefile_count, stream->uid, stream->gid, + stream->out_fd, &(stream->tracefile_count_current)); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd = ret; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -2703,6 +2754,33 @@ end_ht: return NULL; } +static int set_metadata_socket(struct lttng_consumer_local_data *ctx, + struct pollfd *sockpoll, int client_socket) +{ + int ret; + + assert(ctx); + assert(sockpoll); + + if (lttng_consumer_poll_socket(sockpoll) < 0) { + ret = -1; + goto error; + } + DBG("Metadata connection on client_socket"); + + /* Blocking call, waiting for transmission */ + ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket); + if (ctx->consumer_metadata_socket < 0) { + WARN("On accept metadata"); + ret = -1; + goto error; + } + ret = 0; + +error: + return ret; +} + /* * This thread listens on the consumerd socket and receives the file * descriptors from the session daemon. @@ -2769,6 +2847,15 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } + /* + * Setup metadata socket which is the second socket connection on the + * command unix socket. + */ + ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket); + if (ret < 0) { + goto end; + } + /* This socket is not useful anymore. */ ret = close(client_socket); if (ret < 0) { @@ -2907,13 +2994,16 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, - unsigned int sessiond_id) + struct pollfd *consumer_sockpoll, + struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + assert(ctx); + assert(relayd_sock); + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); /* First send a status message before receiving the fds. */ @@ -2963,11 +3053,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->control_sock); + lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->control_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.fd >= 0) { - if (close(relayd->control_sock.fd)) { + if (relayd->control_sock.sock.fd >= 0) { + if (close(relayd->control_sock.sock.fd)) { PERROR("close relayd control socket"); } } @@ -2977,7 +3067,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->control_sock.fd = fd; + relayd->control_sock.sock.fd = fd; + /* Assign version values. */ + relayd->control_sock.major = relayd_sock->major; + relayd->control_sock.minor = relayd_sock->minor; /* * Create a session on the relayd and store the returned id. Lock the @@ -3005,11 +3098,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->data_sock); + lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->data_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.fd >= 0) { - if (close(relayd->data_sock.fd)) { + if (relayd->data_sock.sock.fd >= 0) { + if (close(relayd->data_sock.sock.fd)) { PERROR("close relayd data socket"); } } @@ -3019,7 +3112,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->data_sock.fd = fd; + relayd->data_sock.sock.fd = fd; + /* Assign version values. */ + relayd->data_sock.major = relayd_sock->major; + relayd->data_sock.minor = relayd_sock->minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type);