Fix: Set thread stack size to ulimit soft value
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index e4ded2b8655d17c5f270bd67cfffca65697766f8..e8f3087b11bd95e966be7e4e5c9dcf9ad1eb5a1e 100644 (file)
@@ -81,6 +81,10 @@ static int opt_daemon, opt_background;
  */
 #define NR_LTTNG_RELAY_READY   3
 static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE          65536
+
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
 
@@ -162,24 +166,6 @@ static struct option long_options[] = {
 
 static const char *config_ignore_options[] = { "help", "config" };
 
-/*
- * usage function on stderr
- */
-static void usage(void)
-{
-       fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
-       fprintf(stderr, "  -h, --help                Display this usage.\n");
-       fprintf(stderr, "  -d, --daemonize           Start as a daemon.\n");
-       fprintf(stderr, "  -b, --background          Start as a daemon, keeping console open.\n");
-       fprintf(stderr, "  -C, --control-port URL    Control port listening.\n");
-       fprintf(stderr, "  -D, --data-port URL       Data port listening.\n");
-       fprintf(stderr, "  -L, --live-port URL       Live view port listening.\n");
-       fprintf(stderr, "  -o, --output PATH         Output path for traces. Must use an absolute path.\n");
-       fprintf(stderr, "  -v, --verbose             Verbose mode. Activate DBG() macro.\n");
-       fprintf(stderr, "  -g, --group NAME          Specify the tracing group name. (default: tracing)\n");
-       fprintf(stderr, "  -f  --config              Load daemon configuration file\n");
-}
-
 /*
  * Take an option from the getopt output and set it in the right variable to be
  * used later.
@@ -263,7 +249,11 @@ static int set_option(int opt, const char *arg, const char *optname)
                }
                break;
        case 'h':
-               usage();
+               ret = utils_show_man_page(8, "lttng-relayd");
+               if (ret) {
+                       ERR("Cannot view man page lttng-relayd(8)");
+                       perror("exec");
+               }
                exit(EXIT_FAILURE);
        case 'o':
                if (lttng_is_setuid_setgid()) {
@@ -1330,6 +1320,90 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_reset_metadata: reset a metadata stream
+ */
+static
+int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret, send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_reset_metadata stream_info;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+
+       DBG("Reset metadata received");
+
+       if (!session || conn->version_check_done == 0) {
+               ERR("Trying to reset a metadata stream before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+                       sizeof(struct lttcomm_relayd_reset_metadata), 0);
+       if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid reset_metadata struct "
+                                       "size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+       DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version));
+
+       /* Unsupported for live sessions for now. */
+       if (session->live_timer != 0) {
+               ret = -1;
+               goto end;
+       }
+
+       stream = stream_get_by_id(be64toh(stream_info.stream_id));
+       if (!stream) {
+               ret = -1;
+               goto end;
+       }
+       pthread_mutex_lock(&stream->lock);
+       if (!stream->is_metadata) {
+               ret = -1;
+               goto end_unlock;
+       }
+
+       ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+                       0, 0, -1, -1, stream->stream_fd->fd, NULL,
+                       &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Failed to rotate metadata file %s of channel %s",
+                               stream->path_name, stream->channel_name);
+               goto end_unlock;
+       }
+
+end_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       stream_put(stream);
+
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < 0) {
+               ERR("Relay sending reset metadata reply");
+               ret = send_ret;
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
@@ -2060,6 +2134,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_STREAMS_SENT:
                ret = relay_streams_sent(recv_hdr, conn);
                break;
+       case RELAYD_RESET_METADATA:
+               ret = relay_reset_metadata(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
@@ -2171,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn)
        uint32_t data_size;
        struct relay_session *session;
        bool new_stream = false, close_requested = false;
+       size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+       size_t recv_off = 0;
+       char data_buffer[chunk_size];
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2194,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn)
        }
        session = stream->trace->session;
        data_size = be32toh(data_hdr.data_size);
-       if (data_buffer_size < data_size) {
-               char *tmp_data_ptr;
-
-               tmp_data_ptr = realloc(data_buffer, data_size);
-               if (!tmp_data_ptr) {
-                       ERR("Allocating data buffer");
-                       free(data_buffer);
-                       ret = -1;
-                       goto end_stream_put;
-               }
-               data_buffer = tmp_data_ptr;
-               data_buffer_size = data_size;
-       }
-       memset(data_buffer, 0, data_size);
 
        net_seq_num = be64toh(data_hdr.net_seq_num);
 
        DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
                data_size, stream_id, net_seq_num);
-       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-       if (ret <= 0) {
-               if (ret == 0) {
-                       /* Orderly shutdown. Not necessary to print an error. */
-                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
-               } else {
-                       ERR("Socket %d error %d", conn->sock->fd, ret);
-               }
-               ret = -1;
-               goto end_stream_put;
-       }
 
        pthread_mutex_lock(&stream->lock);
 
@@ -2269,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn)
                }
        }
 
-       /* Write data to stream output fd. */
-       size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
-       if (size_ret < data_size) {
-               ERR("Relay error writing data to file");
-               ret = -1;
-               goto end_stream_unlock;
-       }
+       for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+               size_t recv_size = min(data_size - recv_off, chunk_size);
 
-       DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
-                       size_ret, stream->stream_handle);
+               ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+               if (ret <= 0) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. Not necessary to print an error. */
+                               DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+                       } else {
+                               ERR("Socket %d error %d", conn->sock->fd, ret);
+                       }
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
+
+               /* Write data to stream output fd. */
+               size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+                               recv_size);
+               if (size_ret < recv_size) {
+                       ERR("Relay error writing data to file");
+                       ret = -1;
+                       goto end_stream_unlock;
+               }
+
+               DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+                               size_ret, stream->stream_handle);
+       }
 
        ret = write_padding_to_file(stream->stream_fd->fd,
                        be32toh(data_hdr.padding_size));
@@ -2307,7 +2379,6 @@ end_stream_unlock:
                uatomic_set(&session->new_streams, 1);
                pthread_mutex_unlock(&session->lock);
        }
-end_stream_put:
        stream_put(stream);
 end:
        return ret;
@@ -2625,7 +2696,6 @@ relay_connections_ht_error:
                DBG("Thread exited with error");
        }
        DBG("Worker thread cleanup complete");
-       free(data_buffer);
 error_testpoint:
        if (err) {
                health_error();
@@ -2708,7 +2778,6 @@ int main(int argc, char **argv)
                }
        }
 
-
        /* Initialize thread health monitoring */
        health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
        if (!health_relayd) {
@@ -2723,15 +2792,6 @@ int main(int argc, char **argv)
                goto exit_init_data;
        }
 
-       /* Check if daemon is UID = 0 */
-       if (!getuid()) {
-               if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
-                       ERR("Need to be root to use ports < 1024");
-                       retval = -1;
-                       goto exit_init_data;
-               }
-       }
-
        /* Setup the thread apps communication pipe. */
        if (create_relay_conn_pipe()) {
                retval = -1;
@@ -2741,12 +2801,6 @@ int main(int argc, char **argv)
        /* Init relay command queue. */
        cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
 
-       /* Set up max poll set size */
-       if (lttng_poll_set_max_size()) {
-               retval = -1;
-               goto exit_init_data;
-       }
-
        /* Initialize communication library */
        lttcomm_init();
        lttcomm_inet_init();
@@ -2779,7 +2833,7 @@ int main(int argc, char **argv)
        }
 
        /* Create thread to manage the client socket */
-       ret = pthread_create(&health_thread, NULL,
+       ret = pthread_create(&health_thread, default_pthread_attr(),
                        thread_manage_health, (void *) NULL);
        if (ret) {
                errno = ret;
@@ -2789,7 +2843,7 @@ int main(int argc, char **argv)
        }
 
        /* Setup the dispatcher thread */
-       ret = pthread_create(&dispatcher_thread, NULL,
+       ret = pthread_create(&dispatcher_thread, default_pthread_attr(),
                        relay_thread_dispatcher, (void *) NULL);
        if (ret) {
                errno = ret;
@@ -2799,7 +2853,7 @@ int main(int argc, char **argv)
        }
 
        /* Setup the worker thread */
-       ret = pthread_create(&worker_thread, NULL,
+       ret = pthread_create(&worker_thread, default_pthread_attr(),
                        relay_thread_worker, NULL);
        if (ret) {
                errno = ret;
@@ -2809,7 +2863,7 @@ int main(int argc, char **argv)
        }
 
        /* Setup the listener thread */
-       ret = pthread_create(&listener_thread, NULL,
+       ret = pthread_create(&listener_thread, default_pthread_attr(),
                        relay_thread_listener, (void *) NULL);
        if (ret) {
                errno = ret;
This page took 0.0297 seconds and 5 git commands to generate.