struct lttng_poll_event events;
struct lttcomm_sock *control_sock, *data_sock;
- /*
- * Get allocated in this thread, enqueued to a global queue, dequeued and
- * freed in the worker thread.
- */
- struct relay_command *relay_cmd = NULL;
-
DBG("[thread] Relay listener started");
control_sock = relay_init_sock(control_uri);
ERR("socket poll error");
goto error;
} else if (revents & LPOLLIN) {
- struct lttcomm_sock *newsock = NULL;
+ /*
+ * Get allocated in this thread,
+ * enqueued to a global queue, dequeued
+ * and freed in the worker thread.
+ */
+ struct relay_command *relay_cmd;
+ struct lttcomm_sock *newsock;
relay_cmd = zmalloc(sizeof(struct relay_command));
if (relay_cmd == NULL) {
if (pollfd == data_sock->fd) {
newsock = data_sock->ops->accept(data_sock);
- if (newsock < 0) {
+ if (!newsock) {
PERROR("accepting data sock");
+ free(relay_cmd);
goto error;
}
relay_cmd->type = RELAY_DATA;
DBG("Relay data connection accepted, socket %d", newsock->fd);
- } else if (pollfd == control_sock->fd) {
+ } else {
+ assert(pollfd == control_sock->fd);
newsock = control_sock->ops->accept(control_sock);
- if (newsock < 0) {
+ if (!newsock) {
PERROR("accepting control sock");
+ free(relay_cmd);
goto error;
}
relay_cmd->type = RELAY_CONTROL;
&val, sizeof(int));
if (ret < 0) {
PERROR("setsockopt inet");
+ lttcomm_destroy_sock(newsock);
+ free(relay_cmd);
goto error;
}
relay_cmd->sock = newsock;
free(root_path);
/* send the session id to the client or a negative return code on error */
if (ret < 0) {
- reply.ret_code = htobe32(LTTCOMM_ERR);
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
} else {
- reply.ret_code = htobe32(LTTCOMM_OK);
+ reply.ret_code = htobe32(LTTNG_OK);
}
reply.handle = htobe64(stream->stream_handle);
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
rcu_read_unlock();
if (ret < 0) {
- reply.ret_code = htobe32(LTTCOMM_ERR);
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
} else {
- reply.ret_code = htobe32(LTTCOMM_OK);
+ reply.ret_code = htobe32(LTTNG_OK);
}
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
struct lttcomm_relayd_generic_reply reply;
int ret;
- reply.ret_code = htobe32(LTTCOMM_ERR);
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
if (ret < 0) {
int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd)
{
- int ret = htobe32(LTTCOMM_OK);
+ int ret = htobe32(LTTNG_OK);
struct lttcomm_relayd_generic_reply reply;
struct relay_session *session = cmd->session;
if (!session) {
DBG("Trying to start the streaming without a session established");
- ret = htobe32(LTTCOMM_ERR);
+ ret = htobe32(LTTNG_ERR_UNK);
}
reply.ret_code = ret;
return ret;
}
+/*
+ * Append padding to the file pointed by the file descriptor fd.
+ */
+static int write_padding_to_file(int fd, uint32_t size)
+{
+ int ret = 0;
+ char *zeros;
+
+ if (size == 0) {
+ goto end;
+ }
+
+ zeros = zmalloc(size);
+ if (zeros == NULL) {
+ PERROR("zmalloc zeros for padding");
+ ret = -1;
+ goto end;
+ }
+
+ do {
+ ret = write(fd, zeros, size);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write padding to file");
+ }
+
+end:
+ return ret;
+}
+
/*
* relay_recv_metadata: receive the metada for the session.
*/
int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
- int ret = htobe32(LTTCOMM_OK);
+ int ret = htobe32(LTTNG_OK);
struct relay_session *session = cmd->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
ret = -1;
goto end_unlock;
}
+
+ ret = write_padding_to_file(metadata_stream->fd,
+ be32toh(metadata_struct->padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
DBG2("Relay metadata written");
end_unlock:
int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd)
{
- int ret = htobe32(LTTCOMM_OK);
+ int ret = htobe32(LTTNG_OK);
struct lttcomm_relayd_version reply;
- struct relay_session *session = NULL;
+ struct relay_session *session;
if (cmd->session == NULL) {
session = zmalloc(sizeof(struct relay_session));
session->id = ++last_relay_session_id;
DBG("Created session %" PRIu64, session->id);
cmd->session = session;
+ } else {
+ session = cmd->session;
}
session->version_check_done = 1;
if (ret < 0) {
ERR("Relay sending version");
}
- DBG("Version check done");
+ DBG("Version check done (%u.%u)", be32toh(reply.major),
+ be32toh(reply.minor));
end:
return ret;
ret = -1;
goto end_unlock;
}
+
+ ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
ret, stream->stream_handle);
goto error;
}
ret = read(fd, relay_connection, sizeof(struct relay_command));
- if (ret < 0 || ret < sizeof(relay_connection)) {
+ if (ret < 0 || ret < sizeof(struct relay_command)) {
PERROR("read relay cmd pipe");
goto error_read;
}