relayd_rotate_stream basics in place
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 30 Aug 2017 22:14:58 +0000 (18:14 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Wed, 6 Sep 2017 17:59:58 +0000 (13:59 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/bin/lttng-relayd/main.c
src/common/consumer/consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h

index dca753ba218ed02f5254ad2a74b2b2ad5872277f..b99ecb9824711d7a0fb563022dbd283acb5939f3 100644 (file)
@@ -2109,6 +2109,121 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_rotate_stream: rotate a stream to a new tracefile.
+ */
+static int relay_rotate_stream(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret, send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_stream stream_info;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       size_t len;
+
+       DBG("Rotate stream received");
+
+       if (!session || conn->version_check_done == 0) {
+               ERR("Trying to rotate a stream before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+                       sizeof(stream_info), 0);
+       if (ret < sizeof(stream_info)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_stream struct size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       stream = stream_get_by_id(be64toh(stream_info.stream_id));
+       if (!stream) {
+               ret = -1;
+               goto end;
+       }
+
+       len = lttng_strnlen(stream_info.new_pathname,
+                       sizeof(stream_info.new_pathname));
+       /* Ensure that NULL-terminated and fits in local filename length. */
+       if (len == sizeof(stream_info.new_pathname) || len >= LTTNG_NAME_MAX) {
+               ret = -ENAMETOOLONG;
+               ERR("Path name too long");
+               goto end;
+       }
+
+       fprintf(stderr, "Rotating stream %lu to %s\n", stream_info.stream_id,
+                       stream_info.new_pathname);
+#if 0
+       /*
+        * Set last_net_seq_num before the close flag. Required by data
+        * pending check.
+        */
+       pthread_mutex_lock(&stream->lock);
+       stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+       pthread_mutex_unlock(&stream->lock);
+
+       /*
+        * This is one of the conditions which may trigger a stream close
+        * with the others being:
+        *     1) A close command is received for a stream
+        *     2) The control connection owning the stream is closed
+        *     3) We have received all of the stream's data _after_ a close
+        *        request.
+        */
+       try_stream_close(stream);
+       if (stream->is_metadata) {
+               struct relay_viewer_stream *vstream;
+
+               vstream = viewer_stream_get_by_id(stream->stream_handle);
+               if (vstream) {
+                       if (vstream->metadata_sent == stream->metadata_received) {
+                               /*
+                                * Since all the metadata has been sent to the
+                                * viewer and that we have a request to close
+                                * its stream, we can safely teardown the
+                                * corresponding metadata viewer stream.
+                                */
+                               viewer_stream_put(vstream);
+                       }
+                       /* Put local reference. */
+                       viewer_stream_put(vstream);
+               }
+       }
+#endif
+       stream_put(stream);
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < 0) {
+               ERR("Relay sending stream id");
+               ret = send_ret;
+       }
+
+end_no_session:
+       return ret;
+}
+
+
 /*
  * Process the commands received on the control socket
  */
@@ -2157,6 +2272,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_STREAM:
+               ret = relay_rotate_stream(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
index 0289566d10f0c4e9b1b6afd207573195f938f650..8dee8435779471a622090d16a8f088757e7a5746 100644 (file)
@@ -4095,12 +4095,9 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre
 }
 
 /*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream lock held.
- *
- * Return 0 on success, a negative number of error.
+ * Perform the rotation a local stream file.
  */
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream)
 {
        int ret;
@@ -4164,8 +4161,6 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                }
        }
 
-       lttng_consumer_reset_stream_rotate_state(stream);
-
        ret = 0;
        goto end;
 
@@ -4173,6 +4168,66 @@ error:
        ret = -1;
 end:
        return ret;
+
+}
+
+/*
+ * Perform the rotation a stream file on the relay.
+ */
+int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       char *new_pathname = NULL;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       new_pathname = zmalloc(LTTNG_PATH_MAX * sizeof(char));
+       if (!new_pathname) {
+               ret = -ENOMEM;
+               goto end;
+       }
+
+       ret = snprintf(new_pathname, LTTNG_PATH_MAX, "%s/%s",
+                       stream->channel_ro_pathname, stream->name);
+       if (ret < 0) {
+               PERROR("snprintf stream name");
+               goto end;
+       }
+
+       ret = relayd_rotate_stream(&relayd->control_sock,
+                       stream->relayd_stream_id, new_pathname);
+
+end:
+       free(new_pathname);
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = rotate_relay_stream(ctx, stream);
+       } else {
+               ret = rotate_local_stream(ctx, stream);
+       }
+       lttng_consumer_reset_stream_rotate_state(stream);
+
+       return ret;
 }
 
 /*
index 2adcbe415c63f16a176dba9167c247f34eb0eba3..1d80dc4442dbc91e9d52b374d3bd994c19cbe7be 100644 (file)
@@ -940,3 +940,53 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
 error:
        return ret;
 }
+
+int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
+               const char *new_pathname)
+{
+       int ret;
+       struct lttcomm_relayd_rotate_stream msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       DBG("Relayd rotating stream id %" PRIu64, stream_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.stream_id = htobe64(stream_id);
+       if (lttng_strncpy(msg.new_pathname, new_pathname,
+                               sizeof(msg.new_pathname))) {
+               ret = -1;
+               goto error;
+       }
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) &msg, sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Receive response */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd rotate stream replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+       }
+
+       DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+
+error:
+       return ret;
+
+}
index f090a0db63681b3b83ec451113039b3d40dd0b75..d99bc1d6b1ad3b647b4eeb25602555e82d8d0ccd 100644 (file)
@@ -51,5 +51,7 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                uint64_t net_seq_num);
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
+int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
+               const char *new_pathname);
 
 #endif /* _RELAYD_H */
index 52ca1caa15565dbbfdaa217ec7ac1b9d779646a4..0dd1057e1efad2b019a010d4eea7c97b96e6eb52 100644 (file)
@@ -195,4 +195,9 @@ struct lttcomm_relayd_reset_metadata {
        uint64_t version;
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_rotate_stream {
+       uint64_t stream_id;
+       char new_pathname[LTTNG_PATH_MAX];
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
index e91e069843e821a21bb4ea6854820c1edb21ed80..85955c50ccd1222a98fc4c060567162f2ea55d29 100644 (file)
@@ -126,7 +126,8 @@ enum lttcomm_relayd_command {
        RELAYD_STREAMS_SENT                 = 16,
        /* Ask the relay to reset the metadata trace file (2.8+) */
        RELAYD_RESET_METADATA               = 17,
-       RELAYD_ROTATE                       = 18,
+       /* Ask the relay to rotate a stream file (2.11+) */
+       RELAYD_ROTATE_STREAM                = 18,
 };
 
 /*
This page took 0.032294 seconds and 5 git commands to generate.