X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fkernel-consumer.c;h=9ccad8be4e87dd19cd4e0212acbbd64c4330a4af;hp=49adabb72336b11d0255e3a1f1d2462b0a70601a;hb=e32d7f274604b77bcd83c24994e88df3761ed658;hpb=5b0e3ccb033e701a4c4005d6859757652ca8897c diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 49adabb72..9ccad8be4 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -117,7 +117,7 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; enum lttng_error_code status; - struct ltt_session *session; + struct ltt_session *session = NULL; struct lttng_channel_extended *channel_attr_extended; /* Safety net */ @@ -173,6 +173,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, rcu_read_lock(); session = session_find_by_id(ksession->id); assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); status = notification_thread_command_add_channel( notification_thread_handle, session->name, @@ -189,6 +191,9 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, channel->published_to_notification_thread = true; error: + if (session) { + session_put(session); + } free(pathname); return ret; } @@ -199,25 +204,30 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_add_metadata(struct consumer_socket *sock, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_session *ksession, unsigned int monitor) { int ret; char *pathname; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; + struct ltt_session *session = NULL; + + rcu_read_lock(); /* Safety net */ - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); - DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd); + DBG("Sending metadata %d to kernel consumer", + ksession->metadata_stream_fd); /* Get consumer output pointer */ - consumer = session->consumer; + consumer = ksession->consumer; if (monitor) { - pathname = create_channel_path(consumer, session->uid, session->gid); + pathname = create_channel_path(consumer, + ksession->uid, ksession->gid); } else { /* Empty path. */ pathname = strdup(""); @@ -227,13 +237,18 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, goto error; } + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, - session->metadata->key, - session->id, + ksession->metadata->key, + ksession->id, pathname, - session->uid, - session->gid, + ksession->uid, + ksession->gid, consumer->net_seq_index, DEFAULT_METADATA_NAME, 1, @@ -252,17 +267,17 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); /* Prep stream message structure */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, - session->metadata->key, - session->metadata_stream_fd, - 0); /* CPU: 0 for metadata. */ + consumer_init_add_stream_comm_msg(&lkm, + ksession->metadata->key, + ksession->metadata_stream_fd, + 0 /* CPU: 0 for metadata. */, + session->current_archive_id); health_code_update(); /* Send stream and file descriptor */ ret = consumer_send_stream(sock, consumer, &lkm, - &session->metadata_stream_fd, 1); + &ksession->metadata_stream_fd, 1); if (ret < 0) { goto error; } @@ -270,7 +285,11 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); error: + rcu_read_unlock(); free(pathname); + if (session) { + session_put(session); + } return ret; } @@ -279,8 +298,10 @@ error: */ static int kernel_consumer_add_stream(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session, unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttcomm_consumer_msg lkm; @@ -299,11 +320,11 @@ int kernel_consumer_add_stream(struct consumer_socket *sock, consumer = session->consumer; /* Prep stream consumer message */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, + consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, - stream->cpu); + stream->cpu, + trace_archive_id); health_code_update(); @@ -359,20 +380,28 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_session *session, + struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession, unsigned int monitor) { int ret = LTTNG_OK; struct ltt_kernel_stream *stream; + struct ltt_session *session = NULL; /* Safety net */ assert(channel); - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); + rcu_read_lock(); + + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Bail out if consumer is disabled */ - if (!session->consumer->enabled) { + if (!ksession->consumer->enabled) { ret = LTTNG_OK; goto error; } @@ -381,7 +410,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, channel->channel->name); if (!channel->sent_to_consumer) { - ret = kernel_consumer_add_channel(sock, channel, session, monitor); + ret = kernel_consumer_add_channel(sock, channel, ksession, monitor); if (ret < 0) { goto error; } @@ -395,8 +424,8 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } /* Add stream on the kernel consumer side. */ - ret = kernel_consumer_add_stream(sock, channel, stream, session, - monitor); + ret = kernel_consumer_add_stream(sock, channel, stream, + ksession, monitor, session->current_archive_id); if (ret < 0) { goto error; } @@ -404,6 +433,10 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } error: + rcu_read_unlock(); + if (session) { + session_put(session); + } return ret; }