Clean-up: sessiond manage-consumer: change space to tabs
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
CommitLineData
4ec029ed 1/*
ab5be9fa
MJ
2 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4ec029ed 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
4ec029ed 7 *
4ec029ed
JG
8 */
9
10#include <signal.h>
11
12#include <common/pipe.h>
13#include <common/utils.h>
14
15#include "manage-consumer.h"
16#include "testpoint.h"
17#include "health-sessiond.h"
18#include "utils.h"
19#include "thread.h"
20#include "ust-consumer.h"
21
22struct thread_notifiers {
23 struct lttng_pipe *quit_pipe;
24 struct consumer_data *consumer_data;
52c50f8f 25 sem_t ready;
4ec029ed
JG
26 int initialization_result;
27};
28
29static void mark_thread_as_ready(struct thread_notifiers *notifiers)
30{
31 DBG("Marking consumer management thread as ready");
32 notifiers->initialization_result = 0;
33 sem_post(&notifiers->ready);
34}
35
36static void mark_thread_intialization_as_failed(
37 struct thread_notifiers *notifiers)
38{
52c50f8f 39 ERR("Consumer management thread entering error state");
4ec029ed
JG
40 notifiers->initialization_result = -1;
41 sem_post(&notifiers->ready);
42}
43
44static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
45{
46 DBG("Waiting for consumer management thread to be ready");
47 sem_wait(&notifiers->ready);
48 DBG("Consumer management thread is ready");
49}
50
51/*
52 * This thread manage the consumer error sent back to the session daemon.
53 */
0e0b3d3a 54static void *thread_consumer_management(void *data)
4ec029ed
JG
55{
56 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
57 uint32_t revents, nb_fd;
58 enum lttcomm_return_code code;
59 struct lttng_poll_event events;
60 struct thread_notifiers *notifiers = data;
61 struct consumer_data *consumer_data = notifiers->consumer_data;
62 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
63 struct consumer_socket *cmd_socket_wrapper = NULL;
64
65 DBG("[thread] Manage consumer started");
66
67 rcu_register_thread();
68 rcu_thread_online();
69
70 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
71
72 health_code_update();
73
74 /*
75 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
76 * metadata_sock. Nothing more will be added to this poll set.
77 */
78 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
79 if (ret < 0) {
80 mark_thread_intialization_as_failed(notifiers);
81 goto error_poll;
82 }
83
84 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
85 if (ret < 0) {
86 mark_thread_intialization_as_failed(notifiers);
87 goto error;
88 }
89
90 /*
91 * The error socket here is already in a listening state which was done
92 * just before spawning this thread to avoid a race between the consumer
93 * daemon exec trying to connect and the listen() call.
94 */
95 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
96 if (ret < 0) {
97 mark_thread_intialization_as_failed(notifiers);
98 goto error;
99 }
100
101 health_code_update();
102
103 /* Infinite blocking call, waiting for transmission */
104 health_poll_entry();
105
106 if (testpoint(sessiond_thread_manage_consumer)) {
107 mark_thread_intialization_as_failed(notifiers);
108 goto error;
109 }
110
111 ret = lttng_poll_wait(&events, -1);
112 health_poll_exit();
113 if (ret < 0) {
114 mark_thread_intialization_as_failed(notifiers);
115 goto error;
116 }
117
118 nb_fd = ret;
119
120 for (i = 0; i < nb_fd; i++) {
121 /* Fetch once the poll data */
122 revents = LTTNG_POLL_GETEV(&events, i);
123 pollfd = LTTNG_POLL_GETFD(&events, i);
124
125 health_code_update();
126
4ec029ed
JG
127 /* Thread quit pipe has been closed. Killing thread. */
128 if (pollfd == quit_pipe_read_fd) {
129 err = 0;
130 mark_thread_intialization_as_failed(notifiers);
131 goto exit;
132 } else if (pollfd == consumer_data->err_sock) {
133 /* Event on the registration socket */
134 if (revents & LPOLLIN) {
135 continue;
136 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
137 ERR("consumer err socket poll error");
138 mark_thread_intialization_as_failed(notifiers);
139 goto error;
140 } else {
141 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
142 mark_thread_intialization_as_failed(notifiers);
143 goto error;
144 }
145 }
146 }
147
148 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
149 if (sock < 0) {
150 mark_thread_intialization_as_failed(notifiers);
151 goto error;
152 }
153
154 /*
155 * Set the CLOEXEC flag. Return code is useless because either way, the
156 * show must go on.
157 */
158 (void) utils_set_fd_cloexec(sock);
159
160 health_code_update();
161
162 DBG2("Receiving code from consumer err_sock");
163
164 /* Getting status code from kconsumerd */
165 ret = lttcomm_recv_unix_sock(sock, &code,
166 sizeof(enum lttcomm_return_code));
167 if (ret <= 0) {
168 mark_thread_intialization_as_failed(notifiers);
169 goto error;
170 }
171
172 health_code_update();
173 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
174 ERR("consumer error when waiting for SOCK_READY : %s",
175 lttcomm_get_readable_code(-code));
176 mark_thread_intialization_as_failed(notifiers);
177 goto error;
178 }
179
180 /* Connect both command and metadata sockets. */
181 consumer_data->cmd_sock =
182 lttcomm_connect_unix_sock(
183 consumer_data->cmd_unix_sock_path);
184 consumer_data->metadata_fd =
185 lttcomm_connect_unix_sock(
186 consumer_data->cmd_unix_sock_path);
187 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
188 PERROR("consumer connect cmd socket");
189 mark_thread_intialization_as_failed(notifiers);
190 goto error;
191 }
192
193 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
194
195 /* Create metadata socket lock. */
196 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
197 if (consumer_data->metadata_sock.lock == NULL) {
198 PERROR("zmalloc pthread mutex");
199 mark_thread_intialization_as_failed(notifiers);
200 goto error;
201 }
202 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
203
204 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
205 DBG("Consumer metadata socket ready (fd: %d)",
206 consumer_data->metadata_fd);
207
208 /*
209 * Remove the consumerd error sock since we've established a connection.
210 */
211 ret = lttng_poll_del(&events, consumer_data->err_sock);
212 if (ret < 0) {
213 mark_thread_intialization_as_failed(notifiers);
214 goto error;
215 }
216
217 /* Add new accepted error socket. */
218 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
219 if (ret < 0) {
220 mark_thread_intialization_as_failed(notifiers);
221 goto error;
222 }
223
224 /* Add metadata socket that is successfully connected. */
225 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
226 LPOLLIN | LPOLLRDHUP);
227 if (ret < 0) {
228 mark_thread_intialization_as_failed(notifiers);
229 goto error;
230 }
231
232 health_code_update();
233
234 /*
09ede842
JG
235 * Transfer the write-end of the channel monitoring pipe to the consumer
236 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
4ec029ed
JG
237 */
238 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
239 if (!cmd_socket_wrapper) {
240 mark_thread_intialization_as_failed(notifiers);
241 goto error;
242 }
243 cmd_socket_wrapper->lock = &consumer_data->lock;
244
09ede842
JG
245 pthread_mutex_lock(cmd_socket_wrapper->lock);
246 ret = consumer_init(cmd_socket_wrapper, sessiond_uuid);
247 if (ret) {
248 ERR("Failed to send sessiond uuid to consumer daemon");
249 mark_thread_intialization_as_failed(notifiers);
250 pthread_mutex_unlock(cmd_socket_wrapper->lock);
251 goto error;
252 }
253 pthread_mutex_unlock(cmd_socket_wrapper->lock);
254
4ec029ed
JG
255 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
256 consumer_data->channel_monitor_pipe);
257 if (ret) {
258 mark_thread_intialization_as_failed(notifiers);
259 goto error;
260 }
261
262 /* Discard the socket wrapper as it is no longer needed. */
263 consumer_destroy_socket(cmd_socket_wrapper);
264 cmd_socket_wrapper = NULL;
265
266 /* The thread is completely initialized, signal that it is ready. */
267 mark_thread_as_ready(notifiers);
268
269 /* Infinite blocking call, waiting for transmission */
270 while (1) {
271 health_code_update();
272
273 /* Exit the thread because the thread quit pipe has been triggered. */
274 if (should_quit) {
275 /* Not a health error. */
276 err = 0;
277 goto exit;
278 }
279
280 health_poll_entry();
281 ret = lttng_poll_wait(&events, -1);
282 health_poll_exit();
283 if (ret < 0) {
284 goto error;
285 }
286
287 nb_fd = ret;
288
289 for (i = 0; i < nb_fd; i++) {
290 /* Fetch once the poll data */
291 revents = LTTNG_POLL_GETEV(&events, i);
292 pollfd = LTTNG_POLL_GETFD(&events, i);
293
294 health_code_update();
295
4ec029ed
JG
296 /*
297 * Thread quit pipe has been triggered, flag that we should stop
298 * but continue the current loop to handle potential data from
299 * consumer.
300 */
301 if (pollfd == quit_pipe_read_fd) {
302 should_quit = 1;
303 } else if (pollfd == sock) {
304 /* Event on the consumerd socket */
305 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
306 && !(revents & LPOLLIN)) {
307 ERR("consumer err socket second poll error");
308 goto error;
309 }
310 health_code_update();
311 /* Wait for any kconsumerd error */
312 ret = lttcomm_recv_unix_sock(sock, &code,
313 sizeof(enum lttcomm_return_code));
314 if (ret <= 0) {
315 ERR("consumer closed the command socket");
316 goto error;
317 }
318
319 ERR("consumer return code : %s",
320 lttcomm_get_readable_code(-code));
321
322 goto exit;
323 } else if (pollfd == consumer_data->metadata_fd) {
324 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
325 && !(revents & LPOLLIN)) {
326 ERR("consumer err metadata socket second poll error");
327 goto error;
328 }
329 /* UST metadata requests */
330 ret = ust_consumer_metadata_request(
331 &consumer_data->metadata_sock);
332 if (ret < 0) {
333 ERR("Handling metadata request");
334 goto error;
335 }
336 }
337 /* No need for an else branch all FDs are tested prior. */
338 }
339 health_code_update();
340 }
341
342exit:
343error:
344 /*
345 * We lock here because we are about to close the sockets and some other
346 * thread might be using them so get exclusive access which will abort all
347 * other consumer command by other threads.
348 */
349 pthread_mutex_lock(&consumer_data->lock);
350
351 /* Immediately set the consumerd state to stopped */
352 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
353 uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
354 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
355 consumer_data->type == LTTNG_CONSUMER32_UST) {
356 uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
357 } else {
358 /* Code flow error... */
359 assert(0);
360 }
361
362 if (consumer_data->err_sock >= 0) {
363 ret = close(consumer_data->err_sock);
364 if (ret) {
365 PERROR("close");
366 }
367 consumer_data->err_sock = -1;
368 }
369 if (consumer_data->cmd_sock >= 0) {
370 ret = close(consumer_data->cmd_sock);
371 if (ret) {
372 PERROR("close");
373 }
374 consumer_data->cmd_sock = -1;
375 }
376 if (consumer_data->metadata_sock.fd_ptr &&
377 *consumer_data->metadata_sock.fd_ptr >= 0) {
378 ret = close(*consumer_data->metadata_sock.fd_ptr);
379 if (ret) {
380 PERROR("close");
381 }
382 }
383 if (sock >= 0) {
384 ret = close(sock);
385 if (ret) {
386 PERROR("close");
387 }
388 }
389
390 unlink(consumer_data->err_unix_sock_path);
391 unlink(consumer_data->cmd_unix_sock_path);
392 pthread_mutex_unlock(&consumer_data->lock);
393
394 /* Cleanup metadata socket mutex. */
395 if (consumer_data->metadata_sock.lock) {
396 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
397 free(consumer_data->metadata_sock.lock);
398 }
399 lttng_poll_clean(&events);
400
401 if (cmd_socket_wrapper) {
402 consumer_destroy_socket(cmd_socket_wrapper);
403 }
404error_poll:
405 if (err) {
406 health_error();
407 ERR("Health error occurred in %s", __func__);
408 }
409 health_unregister(health_sessiond);
410 DBG("consumer thread cleanup completed");
411
412 rcu_thread_offline();
413 rcu_unregister_thread();
414
415 return NULL;
416}
417
418static bool shutdown_consumer_management_thread(void *data)
419{
420 struct thread_notifiers *notifiers = data;
421 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
422
423 return notify_thread_pipe(write_fd) == 1;
424}
425
426static void cleanup_consumer_management_thread(void *data)
427{
428 struct thread_notifiers *notifiers = data;
429
430 lttng_pipe_destroy(notifiers->quit_pipe);
431 free(notifiers);
432}
433
434bool launch_consumer_management_thread(struct consumer_data *consumer_data)
435{
436 struct lttng_pipe *quit_pipe;
437 struct thread_notifiers *notifiers = NULL;
438 struct lttng_thread *thread;
439
4ec029ed
JG
440 notifiers = zmalloc(sizeof(*notifiers));
441 if (!notifiers) {
21fa020e
JG
442 goto error_alloc;
443 }
444
445 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
446 if (!quit_pipe) {
4ec029ed
JG
447 goto error;
448 }
449 notifiers->quit_pipe = quit_pipe;
450 notifiers->consumer_data = consumer_data;
451 sem_init(&notifiers->ready, 0, 0);
452
453 thread = lttng_thread_create("Consumer management",
454 thread_consumer_management,
455 shutdown_consumer_management_thread,
456 cleanup_consumer_management_thread,
457 notifiers);
458 if (!thread) {
459 goto error;
460 }
461 wait_until_thread_is_ready(notifiers);
462 lttng_thread_put(thread);
463 if (notifiers->initialization_result) {
bd3739b0 464 return false;
4ec029ed
JG
465 }
466 return true;
467error:
468 cleanup_consumer_management_thread(notifiers);
21fa020e 469error_alloc:
4ec029ed
JG
470 return false;
471}
This page took 0.087531 seconds and 5 git commands to generate.