CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream
[deliverable/lttng-tools.git] / src / common / relayd / relayd.c
index 9e9525503a971dc993237aae1cfdf9188bdc29f4..afcd2d838b3d0f7b2e256ba368a1fac635651135 100644 (file)
@@ -72,14 +72,14 @@ static int send_command(struct lttcomm_relayd_sock *rsock,
                memcpy(buf + sizeof(header), data, size);
        }
 
+       DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
        ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
        if (ret < 0) {
+               PERROR("Failed to send command %d of size %" PRIu64,
+                               (int) cmd, buf_size);
                ret = -errno;
                goto error;
        }
-
-       DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size);
-
 error:
        free(buf);
 alloc_error:
@@ -254,16 +254,16 @@ int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_nam
        /* Compat with relayd 2.1 */
        if (rsock->minor == 1) {
                memset(&msg, 0, sizeof(msg));
-               if (strlen(channel_name) >= sizeof(msg.channel_name)) {
+               if (lttng_strncpy(msg.channel_name, channel_name,
+                               sizeof(msg.channel_name))) {
                        ret = -1;
                        goto error;
                }
-               strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name));
-               if (strlen(pathname) >= sizeof(msg.pathname)) {
+               if (lttng_strncpy(msg.pathname, pathname,
+                               sizeof(msg.pathname))) {
                        ret = -1;
                        goto error;
                }
-               strncpy(msg.pathname, pathname, sizeof(msg.pathname));
 
                /* Send command */
                ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
@@ -273,16 +273,16 @@ int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_nam
        } else {
                memset(&msg_2_2, 0, sizeof(msg_2_2));
                /* Compat with relayd 2.2+ */
-               if (strlen(channel_name) >= sizeof(msg_2_2.channel_name)) {
+               if (lttng_strncpy(msg_2_2.channel_name, channel_name,
+                               sizeof(msg_2_2.channel_name))) {
                        ret = -1;
                        goto error;
                }
-               strncpy(msg_2_2.channel_name, channel_name, sizeof(msg_2_2.channel_name));
-               if (strlen(pathname) >= sizeof(msg_2_2.pathname)) {
+               if (lttng_strncpy(msg_2_2.pathname, pathname,
+                               sizeof(msg_2_2.pathname))) {
                        ret = -1;
                        goto error;
                }
-               strncpy(msg_2_2.pathname, pathname, sizeof(msg_2_2.pathname));
                msg_2_2.tracefile_size = htobe64(tracefile_size);
                msg_2_2.tracefile_count = htobe64(tracefile_count);
 
@@ -320,6 +320,109 @@ error:
        return ret;
 }
 
+/*
+ * Add stream on the relayd. Send part.
+ *
+ * On success return 0 else return ret_code negative value.
+ */
+int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name,
+               const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count)
+{
+       int ret;
+       struct lttcomm_relayd_add_stream msg;
+       struct lttcomm_relayd_add_stream_2_2 msg_2_2;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+       assert(channel_name);
+       assert(pathname);
+
+       DBG("Relayd adding stream for channel name %s. Part send", channel_name);
+
+       /* Compat with relayd 2.1 */
+       if (rsock->minor == 1) {
+               memset(&msg, 0, sizeof(msg));
+               if (lttng_strncpy(msg.channel_name, channel_name,
+                               sizeof(msg.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg.pathname, pathname,
+                               sizeof(msg.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               memset(&msg_2_2, 0, sizeof(msg_2_2));
+               /* Compat with relayd 2.2+ */
+               if (lttng_strncpy(msg_2_2.channel_name, channel_name,
+                               sizeof(msg_2_2.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg_2_2.pathname, pathname,
+                               sizeof(msg_2_2.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+               msg_2_2.tracefile_size = htobe64(tracefile_size);
+               msg_2_2.tracefile_count = htobe64(tracefile_count);
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       }
+
+       DBG("Relayd add stream sent for channel name %s.", channel_name);
+       ret = 0;
+
+error:
+       return ret;
+}
+
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id)
+{
+       int ret;
+       struct lttcomm_relayd_status_stream reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       /* Waiting for reply */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Back to host bytes order. */
+       reply.handle = be64toh(reply.handle);
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd add stream replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+               *_stream_id = reply.handle;
+       }
+
+       DBG("Relayd stream added successfully with handle %" PRIu64,
+               reply.handle);
+
+error:
+       return ret;
+}
+
 /*
  * Inform the relay that all the streams for the current channel has been sent.
  *
@@ -378,7 +481,8 @@ end:
  * If major versions are compatible, we assign minor_to_use to the
  * minor version of the procotol we are going to use for this session.
  *
- * Return 0 if compatible else negative value.
+ * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
+ * otherwise, or a negative value on network errors.
  */
 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
 {
@@ -420,7 +524,7 @@ int relayd_version_check(struct lttcomm_relayd_sock *rsock)
         */
        if (msg.major != rsock->major) {
                /* Not compatible */
-               ret = -1;
+               ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
                DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
                                msg.major, rsock->major);
                goto error;
@@ -856,7 +960,11 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
        }
 
        /* Send command */
-       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
+               lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
+                                                               rsock->minor),
+                               lttng_to_index_minor(rsock->major, rsock->minor)),
+                               0);
        if (ret < 0) {
                goto error;
        }
This page took 0.030028 seconds and 5 git commands to generate.