#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
+#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
#include <unistd.h>
#include <common/futex.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/sessiond-comm/inet.h>
-#include <common/hashtable/hashtable.h>
#include <common/sessiond-comm/relayd.h>
#include <common/uri.h>
#include <common/utils.h>
+#include "cmd.h"
+#include "utils.h"
#include "lttng-relayd.h"
/* command line options */
+char *opt_output_path;
static int opt_daemon;
-static char *opt_output_path;
-static struct lttng_uri *control_uri = NULL;
-static struct lttng_uri *data_uri = NULL;
+static struct lttng_uri *control_uri;
+static struct lttng_uri *data_uri;
const char *progname;
static int is_root; /* Set to 1 if the daemon is running as root */
*/
static int relay_cmd_pipe[2] = { -1, -1 };
+/* Shared between threads */
static int dispatch_thread_exit;
static pthread_t listener_thread;
static pthread_t dispatcher_thread;
static pthread_t worker_thread;
-static uint64_t last_relay_stream_id = 0;
-static uint64_t last_relay_session_id = 0;
+static uint64_t last_relay_stream_id;
+static uint64_t last_relay_session_id;
/*
* Relay command queue.
static struct relay_cmd_queue relay_cmd_queue;
/* buffer allocated at startup, used to store the trace data */
-static char *data_buffer = NULL;
-static unsigned int data_buffer_size = 0;
+static char *data_buffer;
+static unsigned int data_buffer_size;
/*
* usage function on stderr
void usage(void)
{
fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
- fprintf(stderr, " -h, --help Display this usage.\n");
- fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
- fprintf(stderr, " -C, --control-port Control port listening (URI)\n");
- fprintf(stderr, " -D, --data-port Data port listening (URI)\n");
- fprintf(stderr, " -o, --output Output path for traces (PATH)\n");
- fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
+ fprintf(stderr, " -h, --help Display this usage.\n");
+ fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
+ fprintf(stderr, " -C, --control-port URL Control port listening.\n");
+ fprintf(stderr, " -D, --data-port URL Data port listening.\n");
+ fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
+ fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
}
static
char *default_address;
static struct option long_options[] = {
- { "control-port", 1, 0, 'C' },
- { "data-port", 1, 0, 'D' },
- { "daemonize", 0, 0, 'd' },
- { "help", 0, 0, 'h' },
- { "output", 1, 0, 'o' },
- { "verbose", 0, 0, 'v' },
- { NULL, 0, 0, 0 }
+ { "control-port", 1, 0, 'C', },
+ { "data-port", 1, 0, 'D', },
+ { "daemonize", 0, 0, 'd', },
+ { "help", 0, 0, 'h', },
+ { "output", 1, 0, 'o', },
+ { "verbose", 0, 0, 'v', },
+ { NULL, 0, 0, 0, },
};
while (1) {
{
DBG("Cleaning up");
+ /* free the dynamically allocated opt_output_path */
+ free(opt_output_path);
+
/* Close thread quit pipes */
utils_close_pipe(thread_quit_pipe);
- /* Close relay cmd pipes */
- utils_close_pipe(relay_cmd_pipe);
+ uri_free(control_uri);
+ uri_free(data_uri);
}
/*
{
int ret;
- ret = write(wpipe, "!", 1);
- if (ret < 0) {
+ do {
+ ret = write(wpipe, "!", 1);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != 1) {
PERROR("write poll pipe");
}
}
/* Dispatch thread */
- dispatch_thread_exit = 1;
+ CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
}
return NULL;
}
+/*
+ * Return nonzero if stream needs to be closed.
+ */
+static
+int close_stream_check(struct relay_stream *stream)
+{
+
+ if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
+ /*
+ * We are about to close the stream so set the data pending flag to 1
+ * which will make the end data pending command skip the stream which
+ * is now closed and ready. Note that after proceeding to a file close,
+ * the written file is ready for reading.
+ */
+ stream->data_pending_check_done = 1;
+ return 1;
+ }
+ return 0;
+}
+
/*
* This thread manages the listening for new connections on the network
*/
static
void *relay_thread_listener(void *data)
{
- int i, ret, pollfd;
+ int i, ret, pollfd, err = -1;
int val = 1;
uint32_t revents, nb_fd;
struct lttng_poll_event events;
struct lttcomm_sock *control_sock, *data_sock;
- /*
- * Get allocated in this thread, enqueued to a global queue, dequeued and
- * freed in the worker thread.
- */
- struct relay_command *relay_cmd = NULL;
-
DBG("[thread] Relay listener started");
control_sock = relay_init_sock(control_uri);
if (!control_sock) {
- goto error_sock;
+ goto error_sock_control;
}
data_sock = relay_init_sock(data_uri);
if (!data_sock) {
- goto error_sock;
+ goto error_sock_relay;
}
/*
while (1) {
DBG("Listener accepting connections");
- nb_fd = LTTNG_POLL_GETNB(&events);
-
restart:
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
DBG("Relay new connection received");
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
- goto error;
+ err = 0;
+ goto exit;
}
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("socket poll error");
goto error;
} else if (revents & LPOLLIN) {
- struct lttcomm_sock *newsock = NULL;
+ /*
+ * Get allocated in this thread,
+ * enqueued to a global queue, dequeued
+ * and freed in the worker thread.
+ */
+ struct relay_command *relay_cmd;
+ struct lttcomm_sock *newsock;
relay_cmd = zmalloc(sizeof(struct relay_command));
if (relay_cmd == NULL) {
if (pollfd == data_sock->fd) {
newsock = data_sock->ops->accept(data_sock);
- if (newsock < 0) {
+ if (!newsock) {
PERROR("accepting data sock");
+ free(relay_cmd);
goto error;
}
relay_cmd->type = RELAY_DATA;
DBG("Relay data connection accepted, socket %d", newsock->fd);
- } else if (pollfd == control_sock->fd) {
+ } else {
+ assert(pollfd == control_sock->fd);
newsock = control_sock->ops->accept(control_sock);
- if (newsock < 0) {
+ if (!newsock) {
PERROR("accepting control sock");
+ free(relay_cmd);
goto error;
}
relay_cmd->type = RELAY_CONTROL;
&val, sizeof(int));
if (ret < 0) {
PERROR("setsockopt inet");
+ lttcomm_destroy_sock(newsock);
+ free(relay_cmd);
goto error;
}
relay_cmd->sock = newsock;
}
}
+exit:
error:
error_poll_add:
lttng_poll_clean(&events);
error_create_poll:
- if (control_sock->fd >= 0) {
- ret = control_sock->ops->close(control_sock);
+ if (data_sock->fd >= 0) {
+ ret = data_sock->ops->close(data_sock);
if (ret) {
PERROR("close");
}
- lttcomm_destroy_sock(control_sock);
}
- if (data_sock->fd >= 0) {
- ret = data_sock->ops->close(data_sock);
+ lttcomm_destroy_sock(data_sock);
+error_sock_relay:
+ if (control_sock->fd >= 0) {
+ ret = control_sock->ops->close(control_sock);
if (ret) {
PERROR("close");
}
- lttcomm_destroy_sock(data_sock);
}
-
+ lttcomm_destroy_sock(control_sock);
+error_sock_control:
+ if (err) {
+ DBG("Thread exited with error");
+ }
DBG("Relay listener thread cleanup complete");
stop_threads();
-error_sock:
return NULL;
}
DBG("[thread] Relay dispatcher started");
- while (!dispatch_thread_exit) {
+ while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
/* Atomically prepare the queue futex */
futex_nto1_prepare(&relay_cmd_queue.futex);
* call is blocking so we can be assured that the data will be read
* at some point in time or wait to the end of the world :)
*/
- ret = write(relay_cmd_pipe[1], relay_cmd,
- sizeof(struct relay_command));
+ do {
+ ret = write(relay_cmd_pipe[1], relay_cmd,
+ sizeof(struct relay_command));
+ } while (ret < 0 && errno == EINTR);
free(relay_cmd);
- if (ret < 0) {
+ if (ret < 0 || ret != sizeof(struct relay_command)) {
PERROR("write cmd pipe");
goto error;
}
}
/*
- * Return the realpath(3) of the path even if the last directory token does not
- * exist. For example, with /tmp/test1/test2, if test2/ does not exist but the
- * /tmp/test1 does, the real path is returned. In normal time, realpath(3)
- * fails if the end point directory does not exist.
+ * Get stream from stream id.
+ * Need to be called with RCU read-side lock held.
*/
static
-char *expand_full_path(const char *path)
+struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
+ struct lttng_ht *streams_ht)
{
- const char *end_path = path;
- char *next, *cut_path, *expanded_path;
-
- /* Find last token delimited by '/' */
- while ((next = strpbrk(end_path + 1, "/"))) {
- end_path = next;
- }
-
- /* Cut last token from original path */
- cut_path = strndup(path, end_path - path);
-
- expanded_path = malloc(PATH_MAX);
- if (expanded_path == NULL) {
- goto error;
- }
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+ struct relay_stream *ret;
- expanded_path = realpath((char *)cut_path, expanded_path);
- if (expanded_path == NULL) {
- switch (errno) {
- case ENOENT:
- ERR("%s: No such file or directory", cut_path);
- break;
- default:
- PERROR("realpath");
- break;
- }
- goto error;
+ lttng_ht_lookup(streams_ht,
+ (void *)((unsigned long) stream_id),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay stream %" PRIu64 " not found", stream_id);
+ ret = NULL;
+ goto end;
}
- /* Add end part to expanded path */
- strcat(expanded_path, end_path);
-
- free(cut_path);
- return expanded_path;
+ ret = caa_container_of(node, struct relay_stream, stream_n);
-error:
- free(cut_path);
- return NULL;
+end:
+ return ret;
}
-
-/*
- * config_get_default_path
- *
- * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer.
- */
static
-char *config_get_default_path(void)
+void deferred_free_stream(struct rcu_head *head)
{
- return getenv("HOME");
+ struct relay_stream *stream =
+ caa_container_of(head, struct relay_stream, rcu_node);
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
}
/*
- * Create recursively directory using the FULL path.
+ * relay_delete_session: Free all memory associated with a session and
+ * close all the FDs
*/
static
-int mkdir_recursive(char *path, mode_t mode)
+void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
{
- char *p, tmp[PATH_MAX];
- struct stat statbuf;
- size_t len;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct relay_stream *stream;
int ret;
- ret = snprintf(tmp, sizeof(tmp), "%s", path);
- if (ret < 0) {
- PERROR("snprintf mkdir");
- goto error;
+ if (!cmd->session) {
+ return;
}
- len = ret;
- if (tmp[len - 1] == '/') {
- tmp[len - 1] = 0;
- }
+ DBG("Relay deleting session %" PRIu64, cmd->session->id);
- for (p = tmp + 1; *p; p++) {
- if (*p == '/') {
- *p = 0;
- if (tmp[strlen(tmp) - 1] == '.' &&
- tmp[strlen(tmp) - 2] == '.' &&
- tmp[strlen(tmp) - 3] == '/') {
- ERR("Using '/../' is not permitted in the trace path (%s)",
- tmp);
- ret = -1;
- goto error;
- }
- ret = stat(tmp, &statbuf);
- if (ret < 0) {
- ret = mkdir(tmp, mode);
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node) {
+ stream = caa_container_of(node,
+ struct relay_stream, stream_n);
+ if (stream->session == cmd->session) {
+ ret = close(stream->fd);
if (ret < 0) {
- if (!(errno == EEXIST)) {
- PERROR("mkdir recursive");
- ret = -errno;
- goto error;
- }
+ PERROR("close stream fd on delete session");
}
+ ret = lttng_ht_del(streams_ht, &iter);
+ assert(!ret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
}
- *p = '/';
- }
- }
-
- ret = mkdir(tmp, mode);
- if (ret < 0) {
- if (!(errno == EEXIST)) {
- PERROR("mkdir recursive last piece");
- ret = -errno;
- } else {
- ret = 0;
}
}
+ rcu_read_unlock();
-error:
- return ret;
+ free(cmd->session);
}
/*
- * create_output_path: create the output trace directory
+ * Handle the RELAYD_CREATE_SESSION command.
+ *
+ * On success, send back the session id or else return a negative value.
*/
static
-char *create_output_path(char *path_name)
+int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
{
- int ret = 0;
- char *alloc_path = NULL;
- char *traces_path = NULL;
- char *full_path = NULL;
-
- /* Auto output path */
- if (opt_output_path == NULL) {
- alloc_path = strdup(config_get_default_path());
- if (alloc_path == NULL) {
- ERR("Home path not found.\n \
- Please specify an output path using -o, --output PATH");
- ret = -1;
- goto exit;
- }
+ int ret = 0, send_ret;
+ struct relay_session *session;
+ struct lttcomm_relayd_status_session reply;
- ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME
- "/%s", alloc_path, path_name);
- if (ret < 0) {
- PERROR("asprintf trace dir name");
- goto exit;
- }
- } else {
- full_path = expand_full_path(opt_output_path);
- ret = asprintf(&traces_path, "%s/%s", full_path, path_name);
- if (ret < 0) {
- PERROR("asprintf trace dir name");
- goto exit;
- }
+ assert(recv_hdr);
+ assert(cmd);
+
+ memset(&reply, 0, sizeof(reply));
+
+ session = zmalloc(sizeof(struct relay_session));
+ if (session == NULL) {
+ PERROR("relay session zmalloc");
+ ret = -1;
+ goto error;
}
- free(alloc_path);
- free(full_path);
-exit:
- return traces_path;
-}
+ session->id = ++last_relay_session_id;
+ session->sock = cmd->sock;
+ cmd->session = session;
-/*
- * relay_delete_session: Free all memory associated with a session and
- * close all the FDs
- */
-static
-void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
-{
- struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
- struct relay_stream *stream;
- int ret;
+ reply.session_id = htobe64(session->id);
- if (!cmd->session)
- return;
+ DBG("Created session %" PRIu64, session->id);
- DBG("Relay deleting session %lu", cmd->session->id);
- free(cmd->session->sock);
+error:
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_FATAL);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
- cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node) {
- stream = caa_container_of(node,
- struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- close(stream->fd);
- ret = lttng_ht_del(streams_ht, &iter);
- assert(!ret);
- free(stream);
- }
- }
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relayd sending session id");
+ ret = send_ret;
}
+
+ return ret;
}
/*
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
struct relay_session *session = cmd->session;
- struct lttcomm_relayd_add_stream stream_info;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
- char *path = NULL, *root_path = NULL;
int ret, send_ret;
- if (!session || session->version_check_done == 0) {
+ if (!session || cmd->version_check_done == 0) {
ERR("Trying to add a stream before version check");
ret = -1;
goto end_no_session;
}
- /* FIXME : use data_size for something ? */
- ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
- sizeof(struct lttcomm_relayd_add_stream), MSG_WAITALL);
- if (ret < sizeof(struct lttcomm_relayd_add_stream)) {
- ERR("Relay didn't receive valid add_stream struct size : %d", ret);
- ret = -1;
- goto end_no_session;
- }
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
goto end_no_session;
}
- stream->stream_handle = ++last_relay_stream_id;
- stream->seq = 0;
- stream->session = session;
-
- root_path = create_output_path(stream_info.pathname);
- if (!root_path) {
- ret = -1;
- goto end;
+ switch (cmd->minor) {
+ case 1: /* LTTng sessiond 2.1 */
+ ret = cmd_recv_stream_2_1(cmd, stream);
+ break;
+ case 2: /* LTTng sessiond 2.2 */
+ default:
+ ret = cmd_recv_stream_2_2(cmd, stream);
+ break;
}
- ret = mkdir_recursive(root_path, S_IRWXU | S_IRWXG);
if (ret < 0) {
- free(root_path);
- ERR("relay creating output directory");
- goto end;
+ goto err_free_stream;
}
- ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name);
+ rcu_read_lock();
+ stream->stream_handle = ++last_relay_stream_id;
+ stream->prev_seq = -1ULL;
+ stream->session = session;
+
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
if (ret < 0) {
- PERROR("asprintf stream path");
+ ERR("relay creating output directory");
goto end;
}
- ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ /*
+ * No need to use run_as API here because whatever we receives, the relayd
+ * uses its own credentials for the stream files.
+ */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1);
if (ret < 0) {
- PERROR("Relay creating trace file");
+ ERR("Create output file");
goto end;
}
-
stream->fd = ret;
- DBG("Tracefile %s created", path);
+ if (stream->tracefile_size) {
+ DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
+ } else {
+ DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
+ }
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_add_unique_ulong(streams_ht,
&stream->stream_n);
- DBG("Relay new stream added %s", stream_info.channel_name);
+ DBG("Relay new stream added %s", stream->channel_name);
end:
- free(path);
- free(root_path);
+ reply.handle = htobe64(stream->stream_handle);
/* send the session id to the client or a negative return code on error */
if (ret < 0) {
- reply.ret_code = htobe32(LTTCOMM_ERR);
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ /* stream was not properly added to the ht, so free it */
+ free(stream);
} else {
- reply.ret_code = htobe32(LTTCOMM_OK);
+ reply.ret_code = htobe32(LTTNG_OK);
}
- reply.handle = htobe64(stream->stream_handle);
+
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttcomm_relayd_status_stream), 0);
if (send_ret < 0) {
ERR("Relay sending stream id");
+ ret = send_ret;
+ }
+ rcu_read_unlock();
+
+end_no_session:
+ return ret;
+
+err_free_stream:
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
+ return ret;
+}
+
+/*
+ * relay_close_stream: close a specific stream
+ */
+static
+int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_close_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret, send_ret;
+ struct lttng_ht_iter iter;
+
+ DBG("Close stream received");
+
+ if (!session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
+ sizeof(struct lttcomm_relayd_close_stream), 0);
+ if (ret < sizeof(struct lttcomm_relayd_close_stream)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid add_stream struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ rcu_read_lock();
+ stream = relay_stream_from_stream_id(be64toh(stream_info.stream_id),
+ streams_ht);
+ if (!stream) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
+ stream->close_flag = 1;
+
+ if (close_stream_check(stream)) {
+ int delret;
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(streams_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+ }
+
+end_unlock:
+ rcu_read_unlock();
+
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending stream id");
+ ret = send_ret;
}
end_no_session:
struct lttcomm_relayd_generic_reply reply;
int ret;
- reply.ret_code = htobe32(LTTCOMM_ERR);
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
if (ret < 0) {
int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd)
{
- int ret = htobe32(LTTCOMM_OK);
+ int ret = htobe32(LTTNG_OK);
struct lttcomm_relayd_generic_reply reply;
struct relay_session *session = cmd->session;
if (!session) {
DBG("Trying to start the streaming without a session established");
- ret = htobe32(LTTCOMM_ERR);
+ ret = htobe32(LTTNG_ERR_UNK);
}
reply.ret_code = ret;
}
/*
- * Get stream from stream id.
+ * Append padding to the file pointed by the file descriptor fd.
*/
-static
-struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
- struct lttng_ht *streams_ht)
+static int write_padding_to_file(int fd, uint32_t size)
{
- struct lttng_ht_node_ulong *node;
- struct lttng_ht_iter iter;
- struct relay_stream *ret;
+ int ret = 0;
+ char *zeros;
- lttng_ht_lookup(streams_ht,
- (void *)((unsigned long) stream_id),
- &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node == NULL) {
- DBG("Relay stream %lu not found", stream_id);
- ret = NULL;
+ if (size == 0) {
goto end;
}
- ret = caa_container_of(node, struct relay_stream, stream_n);
+ zeros = zmalloc(size);
+ if (zeros == NULL) {
+ PERROR("zmalloc zeros for padding");
+ ret = -1;
+ goto end;
+ }
+
+ do {
+ ret = write(fd, zeros, size);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != size) {
+ PERROR("write padding to file");
+ }
+
+ free(zeros);
end:
return ret;
int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
- int ret = htobe32(LTTCOMM_OK);
+ int ret = htobe32(LTTNG_OK);
struct relay_session *session = cmd->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
goto end;
}
- data_size = be64toh(recv_hdr->data_size);
- payload_size = data_size - sizeof(uint64_t);
+ data_size = payload_size = be64toh(recv_hdr->data_size);
+ if (data_size < sizeof(struct lttcomm_relayd_metadata_payload)) {
+ ERR("Incorrect data size");
+ ret = -1;
+ goto end;
+ }
+ payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
+
if (data_buffer_size < data_size) {
- data_buffer = realloc(data_buffer, data_size);
- if (!data_buffer) {
+ /* In case the realloc fails, we can free the memory */
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
ERR("Allocating data buffer");
+ free(data_buffer);
ret = -1;
goto end;
}
+ data_buffer = tmp_data_ptr;
data_buffer_size = data_size;
}
memset(data_buffer, 0, data_size);
- DBG2("Relay receiving metadata, waiting for %lu bytes", data_size);
- ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
+ DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
+ ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0);
if (ret < 0 || ret != data_size) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive the whole metadata");
+ }
ret = -1;
- ERR("Relay didn't receive the whole metadata");
goto end;
}
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
+
+ rcu_read_lock();
metadata_stream = relay_stream_from_stream_id(
be64toh(metadata_struct->stream_id), streams_ht);
if (!metadata_stream) {
ret = -1;
- goto end;
+ goto end_unlock;
}
- ret = write(metadata_stream->fd, metadata_struct->payload,
- payload_size);
- if (ret < (payload_size)) {
+ do {
+ ret = write(metadata_stream->fd, metadata_struct->payload,
+ payload_size);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end;
+ goto end_unlock;
}
- DBG2("Relay metadata written");
-end:
+ ret = write_padding_to_file(metadata_stream->fd,
+ be32toh(metadata_struct->padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
+ DBG2("Relay metadata written");
+
+end_unlock:
+ rcu_read_unlock();
+end:
return ret;
}
*/
static
int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd)
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
{
- int ret = htobe32(LTTCOMM_OK);
- struct lttcomm_relayd_version reply;
- struct relay_session *session = NULL;
-
- if (cmd->session == NULL) {
- session = zmalloc(sizeof(struct relay_session));
- if (session == NULL) {
- PERROR("relay session zmalloc");
- ret = -1;
- goto end;
+ int ret;
+ struct lttcomm_relayd_version reply, msg;
+
+ assert(cmd);
+
+ cmd->version_check_done = 1;
+
+ /* Get version from the other side. */
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < 0 || ret != sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay failed to receive the version values.");
}
- session->id = ++last_relay_session_id;
- DBG("Created session %lu", session->id);
- cmd->session = session;
+ ret = -1;
+ goto end;
+ }
+
+ ret = sscanf(VERSION, "%10u.%10u", &reply.major, &reply.minor);
+ if (ret < 2) {
+ ERR("Error in scanning version");
+ ret = -1;
+ goto end;
+ }
+
+ /* Major versions must be the same */
+ if (reply.major != be32toh(msg.major)) {
+ DBG("Incompatible major versions (%u vs %u), deleting session",
+ reply.major, be32toh(msg.major));
+ relay_delete_session(cmd, streams_ht);
+ ret = 0;
+ goto end;
+ }
+
+ cmd->major = reply.major;
+ /* We adapt to the lowest compatible version */
+ if (reply.minor <= be32toh(msg.minor)) {
+ cmd->minor = reply.minor;
+ } else {
+ cmd->minor = be32toh(msg.minor);
}
- session->version_check_done = 1;
- sscanf(VERSION, "%u.%u", &reply.major, &reply.minor);
reply.major = htobe32(reply.major);
reply.minor = htobe32(reply.minor);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
if (ret < 0) {
ERR("Relay sending version");
}
- DBG("Version check done");
+
+ DBG("Version check done using protocol %u.%u", cmd->major,
+ cmd->minor);
end:
return ret;
}
+/*
+ * Check for data pending for a given stream id from the session daemon.
+ */
+static
+int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret;
+ uint64_t last_net_seq_num, stream_id;
+
+ DBG("Data pending command received");
+
+ if (!session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid data_pending struct size : %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream_id = be64toh(msg.stream_id);
+ last_net_seq_num = be64toh(msg.last_net_seq_num);
+
+ rcu_read_lock();
+ stream = relay_stream_from_stream_id(stream_id, streams_ht);
+ if (stream == NULL) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
+ " and last_seq %" PRIu64, stream_id, stream->prev_seq,
+ last_net_seq_num);
+
+ /* Avoid wrapping issue */
+ if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) {
+ /* Data has in fact been written and is NOT pending */
+ ret = 0;
+ } else {
+ /* Data still being streamed thus pending */
+ ret = 1;
+ }
+
+ /* Pending check is now done. */
+ stream->data_pending_check_done = 1;
+
+end_unlock:
+ rcu_read_unlock();
+
+ reply.ret_code = htobe32(ret);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data pending ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Wait for the control socket to reach a quiescent state.
+ *
+ * Note that for now, when receiving this command from the session daemon, this
+ * means that every subsequent commands or data received on the control socket
+ * has been handled. So, this is why we simply return OK here.
+ */
+static
+int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ uint64_t stream_id;
+ struct relay_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_quiescent_control msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Checking quiescent state on control socket");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ stream_id = be64toh(msg.stream_id);
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->stream_handle == stream_id) {
+ stream->data_pending_check_done = 1;
+ DBG("Relay quiescent control pending flag set to %" PRIu64,
+ stream_id);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay data quiescent control ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * Initialize a data pending command. This means that a client is about to ask
+ * for data pending for each stream he/she holds. Simply iterate over all
+ * streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static
+int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("Init streams for data pending");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid begin data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /*
+ * Iterate over all streams to set the begin data pending flag. For now, the
+ * streams are indexed by stream handle so we have to iterate over all
+ * streams to find the one associated with the right session_id.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id) {
+ stream->data_pending_check_done = 0;
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(LTTNG_OK);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay begin data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if each
+ * stream associated with it has its data_pending_check_done flag set. If not,
+ * this means that the client lost track of the stream but the data is still
+ * being streamed on our side. In this case, we inform the client that data is
+ * inflight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static
+int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ uint64_t session_id;
+ uint32_t is_data_inflight = 0;
+
+ assert(recv_hdr);
+ assert(cmd);
+ assert(streams_ht);
+
+ DBG("End data pending command");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid end data_pending struct size: %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ session_id = be64toh(msg.session_id);
+
+ /* Iterate over all streams to see if the begin data pending flag is set. */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+ if (stream->session->id == session_id &&
+ !stream->data_pending_check_done) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ /* All good, send back reply. */
+ reply.ret_code = htobe32(is_data_inflight);
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay end data pending send reply failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_process_control: Process the commands received on the control socket
*/
int ret = 0;
switch (be32toh(recv_hdr->cmd)) {
- /*
case RELAYD_CREATE_SESSION:
ret = relay_create_session(recv_hdr, cmd);
break;
- */
case RELAYD_ADD_STREAM:
ret = relay_add_stream(recv_hdr, cmd, streams_ht);
break;
ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
break;
case RELAYD_VERSION:
- ret = relay_send_version(recv_hdr, cmd);
+ ret = relay_send_version(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ ret = relay_close_stream(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_DATA_PENDING:
+ ret = relay_data_pending(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ ret = relay_quiescent_control(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
struct relay_stream *stream;
struct lttcomm_relayd_data_hdr data_hdr;
uint64_t stream_id;
+ uint64_t net_seq_num;
uint32_t data_size;
ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
- sizeof(struct lttcomm_relayd_data_hdr), MSG_WAITALL);
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
if (ret <= 0) {
- ERR("Connections seems to be closed");
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", cmd->sock->fd);
+ }
ret = -1;
goto end;
}
stream_id = be64toh(data_hdr.stream_id);
+
+ rcu_read_lock();
stream = relay_stream_from_stream_id(stream_id, streams_ht);
if (!stream) {
ret = -1;
- goto end;
+ goto end_unlock;
}
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
- data_buffer = realloc(data_buffer, data_size);
- if (!data_buffer) {
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
ERR("Allocating data buffer");
+ free(data_buffer);
ret = -1;
- goto end;
+ goto end_unlock;
}
+ data_buffer = tmp_data_ptr;
data_buffer_size = data_size;
}
memset(data_buffer, 0, data_size);
- ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
+ net_seq_num = be64toh(data_hdr.net_seq_num);
+
+ DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ data_size, stream_id, net_seq_num);
+ ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0);
if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ }
ret = -1;
- goto end;
+ goto end_unlock;
}
- ret = write(stream->fd, data_buffer, data_size);
- if (ret < data_size) {
+ if (stream->tracefile_size > 0 &&
+ (stream->tracefile_size_current + data_size) >
+ stream->tracefile_size) {
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, -1, -1,
+ stream->fd, &(stream->tracefile_count_current));
+ if (ret < 0) {
+ ERR("Rotating output file");
+ goto end;
+ }
+ stream->fd = ret;
+ /* Reset current size because we just perform a stream rotation. */
+ stream->tracefile_size_current = 0;
+ }
+ stream->tracefile_size_current += data_size;
+ do {
+ ret = write(stream->fd, data_buffer, data_size);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret != data_size) {
ERR("Relay error writing data to file");
ret = -1;
- goto end;
+ goto end_unlock;
+ }
+
+ DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
+ ret, stream->stream_handle);
+
+ ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
+ stream->prev_seq = net_seq_num;
+
+ /* Check if we need to close the FD */
+ if (close_stream_check(stream)) {
+ int cret;
+ struct lttng_ht_iter iter;
+
+ cret = close(stream->fd);
+ if (cret < 0) {
+ PERROR("close stream process data");
+ }
+ iter.iter.node = &stream->stream_n.node;
+ ret = lttng_ht_del(streams_ht, &iter);
+ assert(!ret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
+ DBG("Closed tracefile %d after recv data", stream->fd);
}
- DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
+end_unlock:
+ rcu_read_unlock();
end:
return ret;
}
static
-void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events,
- struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter)
+void relay_cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
{
int ret;
- ret = lttng_ht_del(relay_connections_ht, iter);
- assert(!ret);
lttng_poll_del(events, pollfd);
ret = close(pollfd);
int relay_add_connection(int fd, struct lttng_poll_event *events,
struct lttng_ht *relay_connections_ht)
{
- int ret;
struct relay_command *relay_connection;
+ int ret;
relay_connection = zmalloc(sizeof(struct relay_command));
if (relay_connection == NULL) {
PERROR("Relay command zmalloc");
- ret = -1;
- goto end;
+ goto error;
}
- ret = read(fd, relay_connection, sizeof(struct relay_command));
- if (ret < 0 || ret < sizeof(relay_connection)) {
+ do {
+ ret = read(fd, relay_connection, sizeof(struct relay_command));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0 || ret < sizeof(struct relay_command)) {
PERROR("read relay cmd pipe");
- ret = -1;
- goto end;
+ goto error_read;
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
(unsigned long) relay_connection->sock->fd);
+ rcu_read_lock();
lttng_ht_add_unique_ulong(relay_connections_ht,
&relay_connection->sock_n);
- ret = lttng_poll_add(events,
+ rcu_read_unlock();
+ return lttng_poll_add(events,
relay_connection->sock->fd,
LPOLLIN | LPOLLRDHUP);
-end:
- return ret;
+error_read:
+ free(relay_connection);
+error:
+ return -1;
+}
+
+static
+void deferred_free_connection(struct rcu_head *head)
+{
+ struct relay_command *relay_connection =
+ caa_container_of(head, struct relay_command, rcu_node);
+
+ lttcomm_destroy_sock(relay_connection->sock);
+ free(relay_connection);
+}
+
+static
+void relay_del_connection(struct lttng_ht *relay_connections_ht,
+ struct lttng_ht *streams_ht, struct lttng_ht_iter *iter,
+ struct relay_command *relay_connection)
+{
+ int ret;
+
+ ret = lttng_ht_del(relay_connections_ht, iter);
+ assert(!ret);
+ if (relay_connection->type == RELAY_CONTROL) {
+ relay_delete_session(relay_connection, streams_ht);
+ }
+
+ call_rcu(&relay_connection->rcu_node,
+ deferred_free_connection);
}
/*
static
void *relay_thread_worker(void *data)
{
- int i, ret, pollfd;
- uint32_t revents, nb_fd;
+ int ret, err = -1, last_seen_data_fd = -1;
+ uint32_t nb_fd;
struct relay_command *relay_connection;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
DBG("[thread] Relay worker started");
+ rcu_register_thread();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!relay_connections_ht) {
+ goto relay_connections_ht_error;
+ }
/* tables of streams indexed by stream ID */
streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!streams_ht) {
+ goto streams_ht_error;
+ }
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
goto error;
}
+restart:
while (1) {
- /* Zeroed the events structure */
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
+ int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
/* Infinite blocking call, waiting for transmission */
- restart:
+ DBG3("Relayd worker thread polling...");
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
/*
goto error;
}
+ nb_fd = ret;
+
+ /*
+ * Process control. The control connection is prioritised so we don't
+ * starve it with high throughout put tracing data on the data
+ * connection.
+ */
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
- goto error;
+ err = 0;
+ goto exit;
}
/* Inspect the relay cmd pipe for new connection */
goto error;
}
}
- } else if (revents > 0) {
+ } else if (revents) {
+ rcu_read_lock();
lttng_ht_lookup(relay_connections_ht,
(void *)((unsigned long) pollfd),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
DBG2("Relay sock %d not found", pollfd);
+ rcu_read_unlock();
goto error;
}
relay_connection = caa_container_of(node,
if (revents & (LPOLLERR)) {
ERR("POLL ERROR");
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Socket %d hung up", pollfd);
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- if (relay_connection->type == RELAY_CONTROL) {
- relay_delete_session(relay_connection, streams_ht);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
}
- free(relay_connection);
} else if (revents & LPOLLIN) {
/* control socket */
if (relay_connection->type == RELAY_CONTROL) {
ret = relay_connection->sock->ops->recvmsg(
relay_connection->sock, &recv_hdr,
- sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL);
+ sizeof(struct lttcomm_relayd_hdr), 0);
/* connection closed */
if (ret <= 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- relay_delete_session(relay_connection, streams_ht);
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
DBG("Control connection closed with %d", pollfd);
} else {
if (relay_connection->session) {
- DBG2("Relay worker receiving data for session : %lu",
+ DBG2("Relay worker receiving data for session : %" PRIu64,
relay_connection->session->id);
}
ret = relay_process_control(&recv_hdr,
relay_connection,
streams_ht);
- /*
- * there was an error in processing a control
- * command: clear the session
- * */
if (ret < 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- free(relay_connection);
+ /* Clear the session on error. */
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
DBG("Connection closed with %d", pollfd);
}
+ seen_control = 1;
}
- /* data socket */
- } else if (relay_connection->type == RELAY_DATA) {
- ret = relay_process_data(relay_connection, streams_ht);
- /* connection closed */
- if (ret < 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- relay_delete_session(relay_connection, streams_ht);
- DBG("Data connection closed with %d", pollfd);
- }
+ } else {
+ /*
+ * Flag the last seen data fd not deleted. It will be
+ * used as the last seen fd if any fd gets deleted in
+ * this first loop.
+ */
+ last_notdel_data_fd = pollfd;
}
}
+ rcu_read_unlock();
+ }
+ }
+
+ /*
+ * The last loop handled a control request, go back to poll to make
+ * sure we prioritise the control socket.
+ */
+ if (seen_control) {
+ continue;
+ }
+
+ if (last_seen_data_fd >= 0) {
+ for (i = 0; i < nb_fd; i++) {
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (last_seen_data_fd == pollfd) {
+ idx = i;
+ break;
+ }
}
}
+
+ /* Process data connection. */
+ for (i = idx + 1; i < nb_fd; i++) {
+ /* Fetch the poll data. */
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Skip the command pipe. It's handled in the first loop. */
+ if (pollfd == relay_cmd_pipe[0]) {
+ continue;
+ }
+
+ if (revents) {
+ rcu_read_lock();
+ lttng_ht_lookup(relay_connections_ht,
+ (void *)((unsigned long) pollfd),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+ relay_connection = caa_container_of(node,
+ struct relay_command, sock_n);
+
+ if (revents & LPOLLIN) {
+ if (relay_connection->type != RELAY_DATA) {
+ continue;
+ }
+
+ ret = relay_process_data(relay_connection, streams_ht);
+ /* connection closed */
+ if (ret < 0) {
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
+ }
+ }
+ rcu_read_unlock();
+ }
+ }
+ last_seen_data_fd = -1;
}
+exit:
error:
lttng_poll_clean(&events);
/* empty the hash table and free the memory */
+ rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
struct relay_command, sock_n);
- free(relay_connection);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
}
- ret = lttng_ht_del(relay_connections_ht, &iter);
- assert(!ret);
}
+ rcu_read_unlock();
error_poll_create:
- free(data_buffer);
+ lttng_ht_destroy(streams_ht);
+streams_ht_error:
lttng_ht_destroy(relay_connections_ht);
+relay_connections_ht_error:
+ /* Close relay cmd pipes */
+ utils_close_pipe(relay_cmd_pipe);
+ if (err) {
+ DBG("Thread exited with error");
+ }
DBG("Worker thread cleanup complete");
+ free(data_buffer);
stop_threads();
+ rcu_unregister_thread();
return NULL;
}
/* Parse arguments */
progname = argv[0];
- if ((ret = parse_args(argc, argv) < 0)) {
+ if ((ret = parse_args(argc, argv)) < 0) {
goto exit;
}
goto exit;
}
+ /* Try to create directory if -o, --output is specified. */
+ if (opt_output_path) {
+ if (*opt_output_path != '/') {
+ ERR("Please specify an absolute path for -o, --output PATH");
+ goto exit;
+ }
+
+ ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
+ if (ret < 0) {
+ ERR("Unable to create %s", opt_output_path);
+ goto exit;
+ }
+ }
+
/* Daemonize */
if (opt_daemon) {
ret = daemon(0, 0);