X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fkernel-consumer.c;h=05e447754bb2156a77b6ce3d8dc1555f84973864;hp=49adabb72336b11d0255e3a1f1d2462b0a70601a;hb=e098433c90550d74288498f8c4474ef4c2daea68;hpb=3e25d926e1c03011c68430e3a358d1e9a10ca9ab diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 49adabb72..05e447754 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -173,6 +173,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, rcu_read_lock(); session = session_find_by_id(ksession->id); assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); status = notification_thread_command_add_channel( notification_thread_handle, session->name, @@ -199,25 +201,30 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_add_metadata(struct consumer_socket *sock, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_session *ksession, unsigned int monitor) { int ret; char *pathname; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; + struct ltt_session *session; + + rcu_read_lock(); /* Safety net */ - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); - DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd); + DBG("Sending metadata %d to kernel consumer", + ksession->metadata_stream_fd); /* Get consumer output pointer */ - consumer = session->consumer; + consumer = ksession->consumer; if (monitor) { - pathname = create_channel_path(consumer, session->uid, session->gid); + pathname = create_channel_path(consumer, + ksession->uid, ksession->gid); } else { /* Empty path. */ pathname = strdup(""); @@ -227,13 +234,18 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, goto error; } + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, - session->metadata->key, - session->id, + ksession->metadata->key, + ksession->id, pathname, - session->uid, - session->gid, + ksession->uid, + ksession->gid, consumer->net_seq_index, DEFAULT_METADATA_NAME, 1, @@ -252,17 +264,17 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); /* Prep stream message structure */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, - session->metadata->key, - session->metadata_stream_fd, - 0); /* CPU: 0 for metadata. */ + consumer_init_add_stream_comm_msg(&lkm, + ksession->metadata->key, + ksession->metadata_stream_fd, + 0 /* CPU: 0 for metadata. */, + session->current_archive_id); health_code_update(); /* Send stream and file descriptor */ ret = consumer_send_stream(sock, consumer, &lkm, - &session->metadata_stream_fd, 1); + &ksession->metadata_stream_fd, 1); if (ret < 0) { goto error; } @@ -270,6 +282,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); error: + rcu_read_unlock(); free(pathname); return ret; } @@ -279,8 +292,10 @@ error: */ static int kernel_consumer_add_stream(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session, unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttcomm_consumer_msg lkm; @@ -299,11 +314,11 @@ int kernel_consumer_add_stream(struct consumer_socket *sock, consumer = session->consumer; /* Prep stream consumer message */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, + consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, - stream->cpu); + stream->cpu, + trace_archive_id); health_code_update(); @@ -359,20 +374,28 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_session *session, + struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession, unsigned int monitor) { int ret = LTTNG_OK; struct ltt_kernel_stream *stream; + struct ltt_session *session; /* Safety net */ assert(channel); - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); + rcu_read_lock(); + + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Bail out if consumer is disabled */ - if (!session->consumer->enabled) { + if (!ksession->consumer->enabled) { ret = LTTNG_OK; goto error; } @@ -381,7 +404,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, channel->channel->name); if (!channel->sent_to_consumer) { - ret = kernel_consumer_add_channel(sock, channel, session, monitor); + ret = kernel_consumer_add_channel(sock, channel, ksession, monitor); if (ret < 0) { goto error; } @@ -395,8 +418,8 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } /* Add stream on the kernel consumer side. */ - ret = kernel_consumer_add_stream(sock, channel, stream, session, - monitor); + ret = kernel_consumer_add_stream(sock, channel, stream, + ksession, monitor, session->current_archive_id); if (ret < 0) { goto error; } @@ -404,6 +427,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } error: + rcu_read_unlock(); return ret; }