Move libconsumer under common/consumer/
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
index f120144aaaa994383fd96eea1af4d671cb78c960..3bb54f03983178f035fdcb345f8e1d7670557847 100644 (file)
@@ -15,7 +15,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <errno.h>
 #include <stdio.h>
@@ -25,7 +24,7 @@
 #include <inttypes.h>
 
 #include <common/common.h>
-#include <common/consumer.h>
+#include <common/consumer/consumer.h>
 #include <common/defaults.h>
 
 #include "consumer.h"
@@ -73,7 +72,7 @@ static char *setup_trace_path(struct consumer_output *consumer,
                ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
                                ua_sess->euid, ua_sess->egid);
                if (ret < 0) {
-                       if (ret != -EEXIST) {
+                       if (errno != EEXIST) {
                                ERR("Trace directory creation error");
                                goto error;
                        }
@@ -109,6 +108,8 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
        char *pathname = NULL;
        struct lttcomm_consumer_msg msg;
        struct ust_registry_channel *chan_reg;
+       char shm_path[PATH_MAX] = "";
+       char root_shm_path[PATH_MAX] = "";
 
        assert(ua_sess);
        assert(ua_chan);
@@ -136,10 +137,27 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
 
        if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) {
                chan_id = -1U;
+               /*
+                * Metadata channels shm_path (buffers) are handled within
+                * session daemon. Consumer daemon should not try to create
+                * those buffer files.
+                */
        } else {
                chan_reg = ust_registry_channel_find(registry, chan_reg_key);
                assert(chan_reg);
                chan_id = chan_reg->chan_id;
+               if (ua_sess->shm_path[0]) {
+                       strncpy(shm_path, ua_sess->shm_path, sizeof(shm_path));
+                       shm_path[sizeof(shm_path) - 1] = '\0';
+                       strncat(shm_path, "/",
+                               sizeof(shm_path) - strlen(shm_path) - 1);
+                       strncat(shm_path, ua_chan->name,
+                                       sizeof(shm_path) - strlen(shm_path) - 1);
+                               strncat(shm_path, "_",
+                                       sizeof(shm_path) - strlen(shm_path) - 1);
+               }
+               strncpy(root_shm_path, ua_sess->root_shm_path, sizeof(root_shm_path));
+               root_shm_path[sizeof(root_shm_path) - 1] = '\0';
        }
 
        switch (ua_chan->attr.output) {
@@ -171,7 +189,8 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                        ua_chan->tracefile_count,
                        ua_sess->id,
                        ua_sess->output_traces,
-                       ua_sess->uid);
+                       ua_sess->uid,
+                       root_shm_path, shm_path);
 
        health_code_update();
 
@@ -225,14 +244,13 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
        }
 
        pthread_mutex_lock(socket->lock);
-
        ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
+       pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
                goto error;
        }
 
 error:
-       pthread_mutex_unlock(socket->lock);
        return ret;
 }
 
@@ -381,7 +399,9 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app,
        DBG2("UST consumer send stream to app %d", app->sock);
 
        /* Relay stream to application. */
+       pthread_mutex_lock(&app->sock_lock);
        ret = ustctl_send_stream_to_ust(app->sock, channel->obj, stream->obj);
+       pthread_mutex_unlock(&app->sock_lock);
        if (ret < 0) {
                if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
                        ERR("ustctl send stream handle %d to app pid: %d with ret %d",
@@ -416,7 +436,9 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app,
                        app->sock, app->pid, channel->name, channel->tracing_channel_id);
 
        /* Send stream to application. */
+       pthread_mutex_lock(&app->sock_lock);
        ret = ustctl_send_channel_to_ust(app->sock, ua_sess->handle, channel->obj);
+       pthread_mutex_unlock(&app->sock_lock);
        if (ret < 0) {
                if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
                        ERR("Error ustctl send channel %s to app pid: %d with ret %d",
@@ -448,12 +470,12 @@ int ust_consumer_metadata_request(struct consumer_socket *socket)
        assert(socket);
 
        rcu_read_lock();
-       pthread_mutex_lock(socket->lock);
-
        health_code_update();
 
        /* Wait for a metadata request */
+       pthread_mutex_lock(socket->lock);
        ret = consumer_socket_recv(socket, &request, sizeof(request));
+       pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
                goto end;
        }
@@ -488,17 +510,21 @@ int ust_consumer_metadata_request(struct consumer_socket *socket)
        }
        assert(ust_reg);
 
+       pthread_mutex_lock(&ust_reg->lock);
        ret_push = ust_app_push_metadata(ust_reg, socket, 1);
-       if (ret_push < 0) {
+       pthread_mutex_unlock(&ust_reg->lock);
+       if (ret_push == -EPIPE) {
+               DBG("Application or relay closed while pushing metadata");
+       } else if (ret_push < 0) {
                ERR("Pushing metadata");
                ret = -1;
                goto end;
+       } else {
+               DBG("UST Consumer metadata pushed successfully");
        }
-       DBG("UST Consumer metadata pushed successfully");
        ret = 0;
 
 end:
-       pthread_mutex_unlock(socket->lock);
        rcu_read_unlock();
        return ret;
 }
This page took 0.025861 seconds and 5 git commands to generate.