projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: Missing umask when using run as no clone
[lttng-tools.git]
/
src
/
common
/
consumer.c
diff --git
a/src/common/consumer.c
b/src/common/consumer.c
index 51f861e23695c642a56a943aa272fb24c89c7a5d..5766860a89823bf40c4ebd98448fc233e54c06f2 100644
(file)
--- a/
src/common/consumer.c
+++ b/
src/common/consumer.c
@@
-917,7
+917,7
@@
static int consumer_update_poll_array(
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status) {
+ stream->endpoint_status
== CONSUMER_ENDPOINT_INACTIVE
) {
continue;
}
DBG("Active FD %d", stream->wait_fd);
continue;
}
DBG("Active FD %d", stream->wait_fd);
@@
-1267,6
+1267,8
@@
end:
* core function for writing trace buffers to either the local filesystem or
* the network.
*
* core function for writing trace buffers to either the local filesystem or
* the network.
*
+ * It must be called with the stream lock held.
+ *
* Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
* Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
@@
-1419,6
+1421,8
@@
end:
/*
* Splice the data from the ring buffer to the tracefile.
*
/*
* Splice the data from the ring buffer to the tracefile.
*
+ * It must be called with the stream lock held.
+ *
* Returns the number of bytes spliced.
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
* Returns the number of bytes spliced.
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
@@
-1950,7
+1954,7
@@
static void validate_endpoint_status_data_stream(void)
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (stream->endpoint_status
!= CONSUMER_ENDPOINT_IN
ACTIVE) {
+ if (stream->endpoint_status
== CONSUMER_ENDPOINT_
ACTIVE) {
continue;
}
/* Delete it right now */
continue;
}
/* Delete it right now */
@@
-1975,7
+1979,7
@@
static void validate_endpoint_status_metadata_stream(
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (
!stream->endpoint_status
) {
+ if (
stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE
) {
continue;
}
/*
continue;
}
/*
@@
-2450,7
+2454,7
@@
end:
*/
void *consumer_thread_sessiond_poll(void *data)
{
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock, client_socket, ret;
+ int sock
= -1
, client_socket, ret;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
@@
-2510,6
+2514,13
@@
void *consumer_thread_sessiond_poll(void *data)
goto end;
}
goto end;
}
+ /* This socket is not useful anymore. */
+ ret = close(client_socket);
+ if (ret < 0) {
+ PERROR("close client_socket");
+ }
+ client_socket = -1;
+
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
@@
-2553,6
+2564,20
@@
end:
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ /* Cleaning up possibly open sockets. */
+ if (sock >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close sock sessiond poll");
+ }
+ }
+ if (client_socket >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close client_socket sessiond poll");
+ }
+ }
+
rcu_unregister_thread();
return NULL;
}
rcu_unregister_thread();
return NULL;
}
@@
-2623,7
+2648,7
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
- int fd, ret = -1;
+ int fd
= -1
, ret = -1;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
@@
-2650,6
+2675,7
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
+ fd = -1; /* Just in case it gets set with an invalid value. */
goto error;
}
goto error;
}
@@
-2659,14
+2685,15
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
- if (ret < 0) {
- goto error;
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->control_sock.fd >= 0) {
+ if (close(relayd->control_sock.fd)) {
+ PERROR("close relayd control socket");
+ }
}
}
-
- /* Close the created socket fd which is useless */
- ret = close(relayd->control_sock.fd);
+ /* Handle create_sock error. */
if (ret < 0) {
if (ret < 0) {
-
PERROR("close relayd control socket")
;
+
goto error
;
}
/* Assign new file descriptor */
}
/* Assign new file descriptor */
@@
-2676,14
+2703,15
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
- if (ret < 0) {
- goto error;
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->data_sock.fd >= 0) {
+ if (close(relayd->data_sock.fd)) {
+ PERROR("close relayd data socket");
+ }
}
}
-
- /* Close the created socket fd which is useless */
- ret = close(relayd->data_sock.fd);
+ /* Handle create_sock error. */
if (ret < 0) {
if (ret < 0) {
-
PERROR("close relayd control socket")
;
+
goto error
;
}
/* Assign new file descriptor */
}
/* Assign new file descriptor */
@@
-2705,9
+2733,15
@@
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
add_relayd(relayd);
/* All good! */
add_relayd(relayd);
/* All good! */
- ret
=
0;
+ ret
urn
0;
error:
error:
+ /* Close received socket if valid. */
+ if (fd >= 0) {
+ if (close(fd)) {
+ PERROR("close received socket");
+ }
+ }
return ret;
}
return ret;
}
This page took
0.026615 seconds
and
5
git commands to generate.