X-Git-Url: https://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=c1e018523d4eed3e50e6dd814ec165c794e638bd;hb=4852cdb954cd0d685bd4be7e69abfeb3e5388cc6;hp=ced965dee34aa435828eb7484aa8d058d56bb4d3;hpb=564d32178e31dc4da50d0ca511df5674d37a5782;p=deliverable%2Flttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index ced965dee..c1e018523 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -147,7 +147,6 @@ int consumer_recv_status_reply(struct consumer_socket *sock) ret = 0; } else { ret = -reply.ret_code; - DBG("Consumer ret code %d", ret); } end: @@ -731,6 +730,8 @@ error: /* * Send file descriptor to consumer via sock. + * + * The consumer socket lock must be held by the caller. */ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) { @@ -755,6 +756,8 @@ error: /* * Consumer send communication message structure to consumer. + * + * The consumer socket lock must be held by the caller. */ int consumer_send_msg(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) @@ -778,6 +781,8 @@ error: /* * Consumer send channel communication message structure to consumer. + * + * The consumer socket lock must be held by the caller. */ int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) @@ -992,6 +997,8 @@ error: /* * Send relayd socket to consumer associated with a session name. * + * The consumer socket lock must be held by the caller. + * * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, @@ -1066,6 +1073,7 @@ int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + pthread_mutex_lock(consumer_sock->lock); DBG3("Sending set_channel_monitor_pipe command to consumer"); ret = consumer_send_msg(consumer_sock, &msg); if (ret < 0) { @@ -1081,6 +1089,7 @@ int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, DBG2("Channel monitoring pipe successfully sent"); error: + pthread_mutex_unlock(consumer_sock->lock); return ret; } @@ -1438,7 +1447,9 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, } health_code_update(); + pthread_mutex_lock(socket->lock); ret = consumer_send_msg(socket, &msg); + pthread_mutex_unlock(socket->lock); if (ret < 0) { goto error; } @@ -1561,3 +1572,102 @@ end: rcu_read_unlock(); return ret; } + +int consumer_clear_channel(struct consumer_socket *socket, uint64_t key, + struct consumer_output *output) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer clear channel %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL; + msg.u.clear_channel.key = key; + + health_code_update(); + + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error_socket; + } + +error_socket: + pthread_mutex_unlock(socket->lock); + + health_code_update(); + return ret; +} + +static +int consumer_msg_clear_session(struct consumer_socket *socket, uint64_t session_id, + struct consumer_output *output) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer clear session %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_SESSION; + msg.u.clear_session.session_id = session_id; + + health_code_update(); + + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error_socket; + } + +error_socket: + pthread_mutex_unlock(socket->lock); + + health_code_update(); + return ret; +} + +int consumer_clear_session(struct ltt_session *session) +{ + struct ltt_ust_session *usess = session->ust_session; + struct ltt_kernel_session *ksess = session->kernel_session; + int ret; + + rcu_read_lock(); + if (ksess) { + struct consumer_socket *socket; + struct lttng_ht_iter iter; + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + ret = consumer_msg_clear_session(socket, session->id, + ksess->consumer); + if (ret < 0) { + goto error; + } + } + } + if (usess) { + struct consumer_socket *socket; + struct lttng_ht_iter iter; + + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + ret = consumer_msg_clear_session(socket, session->id, + usess->consumer); + if (ret < 0) { + goto error; + } + } + } + rcu_read_unlock(); + return 0; +error: + rcu_read_unlock(); + return ret; +}