Add the relayd create session command
authorDavid Goulet <dgoulet@efficios.com>
Wed, 12 Dec 2012 22:05:45 +0000 (17:05 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 13 Dec 2012 16:50:15 +0000 (11:50 -0500)
This is needed in order to fix a specific condition of the data pending
where we need to have streams associated with a session and this command
will be used for new feature in the future.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
src/common/consumer.c
src/common/consumer.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h

index f231f0f6c0b85b172a56ba0c9d2d0cde15481fa7..2a442b0952cbb4b14870434a78f0f0aa3253c105 100644 (file)
@@ -42,7 +42,6 @@ enum connection_type {
 struct relay_session {
        uint64_t id;
        struct lttcomm_sock *sock;
-       unsigned int version_check_done:1;
 };
 
 /*
@@ -72,6 +71,7 @@ struct relay_command {
        struct lttng_ht_node_ulong sock_n;
        struct rcu_head rcu_node;
        enum connection_type type;
+       unsigned int version_check_done:1;
 };
 
 #endif /* LTTNG_RELAYD_H */
index 2a7152b892192f7e683bf881b2b106af0a2da36c..74e785e2f52d62e0afd16fb46db0fbe37e9c7151 100644 (file)
@@ -885,8 +885,6 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
 
        DBG("Relay deleting session %" PRIu64, cmd->session->id);
 
-       lttcomm_destroy_sock(cmd->session->sock);
-
        rcu_read_lock();
        cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
                node = lttng_ht_iter_get_node_ulong(&iter);
@@ -910,6 +908,54 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
        free(cmd->session);
 }
 
+/*
+ * Handle the RELAYD_CREATE_SESSION command.
+ *
+ * On success, send back the session id or else return a negative value.
+ */
+static
+int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd)
+{
+       int ret = 0, send_ret;
+       struct relay_session *session;
+       struct lttcomm_relayd_status_session reply;
+
+       assert(recv_hdr);
+       assert(cmd);
+
+       memset(&reply, 0, sizeof(reply));
+
+       session = zmalloc(sizeof(struct relay_session));
+       if (session == NULL) {
+               PERROR("relay session zmalloc");
+               ret = -1;
+               goto error;
+       }
+
+       session->id = ++last_relay_session_id;
+       session->sock = cmd->sock;
+       cmd->session = session;
+
+       reply.session_id = htobe64(session->id);
+
+       DBG("Created session %" PRIu64, session->id);
+
+error:
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_FATAL);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+
+       send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (send_ret < 0) {
+               ERR("Relayd sending session id");
+       }
+
+       return ret;
+}
+
 /*
  * relay_add_stream: allocate a new stream for a session
  */
@@ -924,7 +970,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        char *path = NULL, *root_path = NULL;
        int ret, send_ret;
 
-       if (!session || session->version_check_done == 0) {
+       if (!session || cmd->version_check_done == 0) {
                ERR("Trying to add a stream before version check");
                ret = -1;
                goto end_no_session;
@@ -1021,7 +1067,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
 
        DBG("Close stream received");
 
-       if (!session || session->version_check_done == 0) {
+       if (!session || cmd->version_check_done == 0) {
                ERR("Trying to close a stream before version check");
                ret = -1;
                goto end_no_session;
@@ -1281,22 +1327,10 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        struct lttcomm_relayd_version reply, msg;
-       struct relay_session *session;
 
-       if (cmd->session == NULL) {
-               session = zmalloc(sizeof(struct relay_session));
-               if (session == NULL) {
-                       PERROR("relay session zmalloc");
-                       ret = -1;
-                       goto end;
-               }
-               session->id = ++last_relay_session_id;
-               DBG("Created session %" PRIu64, session->id);
-               cmd->session = session;
-       } else {
-               session = cmd->session;
-       }
-       session->version_check_done = 1;
+       assert(cmd);
+
+       cmd->version_check_done = 1;
 
        /* Get version from the other side. */
        ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
@@ -1351,7 +1385,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 
        DBG("Data pending command received");
 
-       if (!session || session->version_check_done == 0) {
+       if (!session || cmd->version_check_done == 0) {
                ERR("Trying to check for data before version check");
                ret = -1;
                goto end_no_session;
@@ -1440,11 +1474,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        int ret = 0;
 
        switch (be32toh(recv_hdr->cmd)) {
-               /*
        case RELAYD_CREATE_SESSION:
                ret = relay_create_session(recv_hdr, cmd);
                break;
-               */
        case RELAYD_ADD_STREAM:
                ret = relay_add_stream(recv_hdr, cmd, streams_ht);
                break;
index f57cfffff57262cbb04ef048fb761e75019eb805..f4cfa82ce8dcc1179843a806a4d969f432b5391e 100644 (file)
@@ -2743,6 +2743,17 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
+
+               /*
+                * Create a session on the relayd and store the returned id. No need to
+                * grab the socket lock since the relayd object is not yet visible.
+                */
+               ret = relayd_create_session(&relayd->control_sock,
+                               &relayd->session_id);
+               if (ret < 0) {
+                       goto error;
+               }
+
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
index a476dd5bf84fa5617df92ba603557e35b374f0ee..80bb46a830ab1f9b9649d4d4ecee9cf7a469ddb5 100644 (file)
@@ -186,6 +186,9 @@ struct consumer_relayd_sock_pair {
         */
        struct lttcomm_sock data_sock;
        struct lttng_ht_node_ulong node;
+
+       /* Session id on the relayd side for the sockets. */
+       uint64_t session_id;
 };
 
 /*
index 9cc682713ca7368b6d0b21255bbf84c4740a39b2..56ca98223a383bd6d9a36b288decea7c677505ac 100644 (file)
@@ -99,6 +99,54 @@ error:
        return ret;
 }
 
+/*
+ * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
+ * set session_id of the relayd if we have a successful reply from the relayd.
+ *
+ * On success, return 0 else a negative value being a lttng_error_code returned
+ * from the relayd.
+ */
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+{
+       int ret;
+       struct lttcomm_relayd_status_session reply;
+
+       assert(sock);
+       assert(session_id);
+
+       DBG("Relayd create session");
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.session_id = be64toh(reply.session_id);
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd create session replied error %d", ret);
+               goto error;
+       } else {
+               ret = 0;
+               *session_id = reply.session_id;
+       }
+
+       DBG("Relayd session created with id %" PRIu64, reply.session_id);
+
+error:
+       return ret;
+}
+
 /*
  * Add stream on the relayd and assign stream handle to the stream_id argument.
  *
index 8857bc9fbb058e73b5383f47c2711c35b33b0a88..9d65c3b8a2e71182c3400997530d78a829197d9d 100644 (file)
 
 int relayd_connect(struct lttcomm_sock *sock);
 int relayd_close(struct lttcomm_sock *sock);
-#if 0
-int relayd_create_session(struct lttcomm_sock *sock, const char *hostname,
-               const char *session_name);
-#endif
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id);
 int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
                const char *pathname, uint64_t *stream_id);
 int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
index 7bc7a12173236b4609322291ec3d5af89038853b..6cd9a21f4ca922b81cd8df4dd1ea41fb276a3321 100644 (file)
@@ -53,6 +53,14 @@ struct lttcomm_relayd_data_hdr {
        uint32_t padding_size;  /* Size of 0 padding the data */
 } __attribute__ ((__packed__));
 
+/*
+ * Reply from a create session command.
+ */
+struct lttcomm_relayd_status_session {
+       uint64_t session_id;
+       uint32_t ret_code;
+} __attribute__ ((__packed__));
+
 /*
  * Used to add a stream on the relay daemon.
  */
This page took 0.031506 seconds and 5 git commands to generate.