projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Introduce the relayd socket object
[lttng-tools.git]
/
src
/
common
/
consumer.c
diff --git
a/src/common/consumer.c
b/src/common/consumer.c
index b6e440a486fc5995f9a7caca0eac39631bf3340c..a9070b1c9bc3f976ef26a2fbbc183c7fd4b75343 100644
(file)
--- a/
src/common/consumer.c
+++ b/
src/common/consumer.c
@@
-783,7
+783,7
@@
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
}
/* Metadata are always sent on the control socket. */
}
/* Metadata are always sent on the control socket. */
- outfd = relayd->control_sock.fd;
+ outfd = relayd->control_sock.
sock.
fd;
} else {
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
} else {
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
@@
-808,7
+808,7
@@
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
++stream->next_net_seq_num;
/* Set to go on data socket */
++stream->next_net_seq_num;
/* Set to go on data socket */
- outfd = relayd->data_sock.fd;
+ outfd = relayd->data_sock.
sock.
fd;
}
error:
}
error:
@@
-1300,7
+1300,12
@@
int lttng_create_output_file(struct lttng_consumer_stream *stream)
char *path;
assert(stream);
char *path;
assert(stream);
- assert(stream->net_seq_idx == (uint64_t) -1ULL);
+
+ /* Don't create anything if this is set for streaming. */
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = 0;
+ goto end;
+ }
ret = snprintf(full_path, sizeof(full_path), "%s/%s",
stream->chan->pathname, stream->name);
ret = snprintf(full_path, sizeof(full_path), "%s/%s",
stream->chan->pathname, stream->name);
@@
-1337,6
+1342,7
@@
int lttng_create_output_file(struct lttng_consumer_stream *stream)
error_open:
free(path_name_id);
error:
error_open:
free(path_name_id);
error:
+end:
return ret;
}
return ret;
}
@@
-3075,13
+3081,16
@@
void lttng_consumer_init(void)
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll,
struct lttcomm_sock *relayd_sock,
- unsigned int sessiond_id)
+ struct pollfd *consumer_sockpoll,
+
struct lttcomm_relayd_sock *relayd_sock,
unsigned int sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
+ assert(ctx);
+ assert(relayd_sock);
+
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
/* First send a status message before receiving the fds. */
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
/* First send a status message before receiving the fds. */
@@
-3131,11
+3140,11
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock
, relayd_
sock);
- ret = lttcomm_create_sock(&relayd->control_sock);
+ lttcomm_copy_sock(&relayd->control_sock
.sock, &relayd_sock->
sock);
+ ret = lttcomm_create_sock(&relayd->control_sock
.sock
);
/* Immediately try to close the created socket if valid. */
/* Immediately try to close the created socket if valid. */
- if (relayd->control_sock.fd >= 0) {
- if (close(relayd->control_sock.fd)) {
+ if (relayd->control_sock.
sock.
fd >= 0) {
+ if (close(relayd->control_sock.
sock.
fd)) {
PERROR("close relayd control socket");
}
}
PERROR("close relayd control socket");
}
}
@@
-3145,7
+3154,10
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
}
/* Assign new file descriptor */
}
/* Assign new file descriptor */
- relayd->control_sock.fd = fd;
+ relayd->control_sock.sock.fd = fd;
+ /* Assign version values. */
+ relayd->control_sock.major = relayd_sock->major;
+ relayd->control_sock.minor = relayd_sock->minor;
/*
* Create a session on the relayd and store the returned id. Lock the
/*
* Create a session on the relayd and store the returned id. Lock the
@@
-3173,11
+3185,11
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock
, relayd_
sock);
- ret = lttcomm_create_sock(&relayd->data_sock);
+ lttcomm_copy_sock(&relayd->data_sock
.sock, &relayd_sock->
sock);
+ ret = lttcomm_create_sock(&relayd->data_sock
.sock
);
/* Immediately try to close the created socket if valid. */
/* Immediately try to close the created socket if valid. */
- if (relayd->data_sock.fd >= 0) {
- if (close(relayd->data_sock.fd)) {
+ if (relayd->data_sock.
sock.
fd >= 0) {
+ if (close(relayd->data_sock.
sock.
fd)) {
PERROR("close relayd data socket");
}
}
PERROR("close relayd data socket");
}
}
@@
-3187,7
+3199,10
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
}
/* Assign new file descriptor */
}
/* Assign new file descriptor */
- relayd->data_sock.fd = fd;
+ relayd->data_sock.sock.fd = fd;
+ /* Assign version values. */
+ relayd->data_sock.major = relayd_sock->major;
+ relayd->data_sock.minor = relayd_sock->minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
This page took
0.029539 seconds
and
5
git commands to generate.