Fix: sessiond: ust session is inactive during ust_app_global_update
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
CommitLineData
5d1b0219 1/*
ab5be9fa
MJ
2 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
10#include <stddef.h>
11#include <stdlib.h>
12#include <urcu.h>
13#include <common/futex.h>
14#include <common/macros.h>
15
16#include "dispatch.h"
17#include "ust-app.h"
18#include "testpoint.h"
19#include "fd-limit.h"
20#include "health-sessiond.h"
21#include "lttng-sessiond.h"
22#include "thread.h"
23
24struct thread_notifiers {
25 struct ust_cmd_queue *ust_cmd_queue;
26 int apps_cmd_pipe_write_fd;
27 int apps_cmd_notify_pipe_write_fd;
28 int dispatch_thread_exit;
29};
30
31/*
32 * For each tracing session, update newly registered apps. The session list
33 * lock MUST be acquired before calling this.
34 */
35static void update_ust_app(int app_sock)
36{
37 struct ltt_session *sess, *stmp;
38 const struct ltt_session_list *session_list = session_get_list();
39
40 /* Consumer is in an ERROR state. Stop any application update. */
41 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
42 /* Stop the update process since the consumer is dead. */
43 return;
44 }
45
46 /* For all tracing session(s) */
47 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
48 struct ust_app *app;
49
50 if (!session_get(sess)) {
51 continue;
52 }
53 session_lock(sess);
e5b7dd9f
JR
54 if (!sess->active || !sess->ust_session ||
55 !sess->ust_session->active) {
5d1b0219
JG
56 goto unlock_session;
57 }
58
59 rcu_read_lock();
60 assert(app_sock >= 0);
61 app = ust_app_find_by_sock(app_sock);
62 if (app == NULL) {
63 /*
64 * Application can be unregistered before so
65 * this is possible hence simply stopping the
66 * update.
67 */
68 DBG3("UST app update failed to find app sock %d",
69 app_sock);
70 goto unlock_rcu;
71 }
72 ust_app_global_update(sess->ust_session, app);
73 unlock_rcu:
74 rcu_read_unlock();
75 unlock_session:
76 session_unlock(sess);
77 session_put(sess);
78 }
79}
80
81/*
82 * Sanitize the wait queue of the dispatch registration thread meaning removing
83 * invalid nodes from it. This is to avoid memory leaks for the case the UST
84 * notify socket is never received.
85 */
86static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
87{
88 int ret, nb_fd = 0, i;
89 unsigned int fd_added = 0;
90 struct lttng_poll_event events;
91 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
92
93 assert(wait_queue);
94
95 lttng_poll_init(&events);
96
97 /* Just skip everything for an empty queue. */
98 if (!wait_queue->count) {
99 goto end;
100 }
101
102 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
103 if (ret < 0) {
104 goto error_create;
105 }
106
107 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
108 &wait_queue->head, head) {
109 assert(wait_node->app);
110 ret = lttng_poll_add(&events, wait_node->app->sock,
111 LPOLLHUP | LPOLLERR);
112 if (ret < 0) {
113 goto error;
114 }
115
116 fd_added = 1;
117 }
118
119 if (!fd_added) {
120 goto end;
121 }
122
123 /*
124 * Poll but don't block so we can quickly identify the faulty events and
125 * clean them afterwards from the wait queue.
126 */
127 ret = lttng_poll_wait(&events, 0);
128 if (ret < 0) {
129 goto error;
130 }
131 nb_fd = ret;
132
133 for (i = 0; i < nb_fd; i++) {
134 /* Get faulty FD. */
135 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
136 int pollfd = LTTNG_POLL_GETFD(&events, i);
137
5d1b0219
JG
138 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
139 &wait_queue->head, head) {
140 if (pollfd == wait_node->app->sock &&
141 (revents & (LPOLLHUP | LPOLLERR))) {
142 cds_list_del(&wait_node->head);
143 wait_queue->count--;
144 ust_app_destroy(wait_node->app);
145 free(wait_node);
146 /*
147 * Silence warning of use-after-free in
148 * cds_list_for_each_entry_safe which uses
149 * __typeof__(*wait_node).
150 */
151 wait_node = NULL;
152 break;
153 } else {
154 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
155 goto error;
156 }
157 }
158 }
159
160 if (nb_fd > 0) {
161 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
162 }
163
164end:
165 lttng_poll_clean(&events);
166 return;
167
168error:
169 lttng_poll_clean(&events);
170error_create:
171 ERR("Unable to sanitize wait queue");
172 return;
173}
174
175/*
176 * Send a socket to a thread This is called from the dispatch UST registration
177 * thread once all sockets are set for the application.
178 *
179 * The sock value can be invalid, we don't really care, the thread will handle
180 * it and make the necessary cleanup if so.
181 *
182 * On success, return 0 else a negative value being the errno message of the
183 * write().
184 */
185static int send_socket_to_thread(int fd, int sock)
186{
187 ssize_t ret;
188
189 /*
190 * It's possible that the FD is set as invalid with -1 concurrently just
191 * before calling this function being a shutdown state of the thread.
192 */
193 if (fd < 0) {
194 ret = -EBADF;
195 goto error;
196 }
197
198 ret = lttng_write(fd, &sock, sizeof(sock));
199 if (ret < sizeof(sock)) {
200 PERROR("write apps pipe %d", fd);
201 if (ret < 0) {
202 ret = -errno;
203 }
204 goto error;
205 }
206
207 /* All good. Don't send back the write positive ret value. */
208 ret = 0;
209error:
210 return (int) ret;
211}
212
213static void cleanup_ust_dispatch_thread(void *data)
214{
215 free(data);
216}
217
218/*
219 * Dispatch request from the registration threads to the application
220 * communication thread.
221 */
222static void *thread_dispatch_ust_registration(void *data)
223{
224 int ret, err = -1;
225 struct cds_wfcq_node *node;
226 struct ust_command *ust_cmd = NULL;
227 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
228 struct ust_reg_wait_queue wait_queue = {
229 .count = 0,
230 };
231 struct thread_notifiers *notifiers = data;
232
233 rcu_register_thread();
234
235 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
236
237 if (testpoint(sessiond_thread_app_reg_dispatch)) {
238 goto error_testpoint;
239 }
240
241 health_code_update();
242
243 CDS_INIT_LIST_HEAD(&wait_queue.head);
244
245 DBG("[thread] Dispatch UST command started");
246
247 for (;;) {
248 health_code_update();
249
250 /* Atomically prepare the queue futex */
251 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
252
253 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
254 break;
255 }
256
257 do {
258 struct ust_app *app = NULL;
259 ust_cmd = NULL;
260
261 /*
262 * Make sure we don't have node(s) that have hung up before receiving
263 * the notify socket. This is to clean the list in order to avoid
264 * memory leaks from notify socket that are never seen.
265 */
266 sanitize_wait_queue(&wait_queue);
267
268 health_code_update();
269 /* Dequeue command for registration */
270 node = cds_wfcq_dequeue_blocking(
271 &notifiers->ust_cmd_queue->head,
272 &notifiers->ust_cmd_queue->tail);
273 if (node == NULL) {
274 DBG("Woken up but nothing in the UST command queue");
275 /* Continue thread execution */
276 break;
277 }
278
279 ust_cmd = caa_container_of(node, struct ust_command, node);
280
281 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
282 " gid:%d sock:%d name:%s (version %d.%d)",
283 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
284 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
285 ust_cmd->sock, ust_cmd->reg_msg.name,
286 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
287
288 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
289 wait_node = zmalloc(sizeof(*wait_node));
290 if (!wait_node) {
291 PERROR("zmalloc wait_node dispatch");
292 ret = close(ust_cmd->sock);
293 if (ret < 0) {
294 PERROR("close ust sock dispatch %d", ust_cmd->sock);
295 }
296 lttng_fd_put(LTTNG_FD_APPS, 1);
297 free(ust_cmd);
58f835e1 298 ust_cmd = NULL;
5d1b0219
JG
299 goto error;
300 }
301 CDS_INIT_LIST_HEAD(&wait_node->head);
302
303 /* Create application object if socket is CMD. */
304 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
305 ust_cmd->sock);
306 if (!wait_node->app) {
307 ret = close(ust_cmd->sock);
308 if (ret < 0) {
309 PERROR("close ust sock dispatch %d", ust_cmd->sock);
310 }
311 lttng_fd_put(LTTNG_FD_APPS, 1);
312 free(wait_node);
58f835e1 313 wait_node = NULL;
5d1b0219 314 free(ust_cmd);
58f835e1 315 ust_cmd = NULL;
5d1b0219
JG
316 continue;
317 }
318 /*
319 * Add application to the wait queue so we can set the notify
320 * socket before putting this object in the global ht.
321 */
322 cds_list_add(&wait_node->head, &wait_queue.head);
323 wait_queue.count++;
324
325 free(ust_cmd);
58f835e1 326 ust_cmd = NULL;
5d1b0219
JG
327 /*
328 * We have to continue here since we don't have the notify
329 * socket and the application MUST be added to the hash table
330 * only at that moment.
331 */
332 continue;
333 } else {
334 /*
335 * Look for the application in the local wait queue and set the
336 * notify socket if found.
337 */
338 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
339 &wait_queue.head, head) {
340 health_code_update();
341 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
342 wait_node->app->notify_sock = ust_cmd->sock;
343 cds_list_del(&wait_node->head);
344 wait_queue.count--;
345 app = wait_node->app;
346 free(wait_node);
58f835e1 347 wait_node = NULL;
5d1b0219
JG
348 DBG3("UST app notify socket %d is set", ust_cmd->sock);
349 break;
350 }
351 }
352
353 /*
354 * With no application at this stage the received socket is
355 * basically useless so close it before we free the cmd data
356 * structure for good.
357 */
358 if (!app) {
359 ret = close(ust_cmd->sock);
360 if (ret < 0) {
361 PERROR("close ust sock dispatch %d", ust_cmd->sock);
362 }
363 lttng_fd_put(LTTNG_FD_APPS, 1);
364 }
365 free(ust_cmd);
58f835e1 366 ust_cmd = NULL;
5d1b0219
JG
367 }
368
369 if (app) {
370 /*
371 * @session_lock_list
372 *
373 * Lock the global session list so from the register up to the
374 * registration done message, no thread can see the application
375 * and change its state.
376 */
377 session_lock_list();
378 rcu_read_lock();
379
380 /*
381 * Add application to the global hash table. This needs to be
382 * done before the update to the UST registry can locate the
383 * application.
384 */
385 ust_app_add(app);
386
387 /* Set app version. This call will print an error if needed. */
388 (void) ust_app_version(app);
389
390 /* Send notify socket through the notify pipe. */
391 ret = send_socket_to_thread(
392 notifiers->apps_cmd_notify_pipe_write_fd,
393 app->notify_sock);
394 if (ret < 0) {
395 rcu_read_unlock();
396 session_unlock_list();
397 /*
398 * No notify thread, stop the UST tracing. However, this is
399 * not an internal error of the this thread thus setting
400 * the health error code to a normal exit.
401 */
402 err = 0;
403 goto error;
404 }
405
406 /*
407 * Update newly registered application with the tracing
408 * registry info already enabled information.
409 */
410 update_ust_app(app->sock);
411
412 /*
413 * Don't care about return value. Let the manage apps threads
414 * handle app unregistration upon socket close.
415 */
416 (void) ust_app_register_done(app);
417
418 /*
419 * Even if the application socket has been closed, send the app
420 * to the thread and unregistration will take place at that
421 * place.
422 */
423 ret = send_socket_to_thread(
424 notifiers->apps_cmd_pipe_write_fd,
425 app->sock);
426 if (ret < 0) {
427 rcu_read_unlock();
428 session_unlock_list();
429 /*
430 * No apps. thread, stop the UST tracing. However, this is
431 * not an internal error of the this thread thus setting
432 * the health error code to a normal exit.
433 */
434 err = 0;
435 goto error;
436 }
437
438 rcu_read_unlock();
439 session_unlock_list();
440 }
441 } while (node != NULL);
442
443 health_poll_entry();
444 /* Futex wait on queue. Blocking call on futex() */
445 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
446 health_poll_exit();
447 }
448 /* Normal exit, no error */
449 err = 0;
450
451error:
452 /* Clean up wait queue. */
453 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
454 &wait_queue.head, head) {
455 cds_list_del(&wait_node->head);
456 wait_queue.count--;
457 free(wait_node);
458 }
459
460 /* Empty command queue. */
461 for (;;) {
462 /* Dequeue command for registration */
463 node = cds_wfcq_dequeue_blocking(
464 &notifiers->ust_cmd_queue->head,
465 &notifiers->ust_cmd_queue->tail);
466 if (node == NULL) {
467 break;
468 }
469 ust_cmd = caa_container_of(node, struct ust_command, node);
470 ret = close(ust_cmd->sock);
471 if (ret < 0) {
472 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
473 }
474 lttng_fd_put(LTTNG_FD_APPS, 1);
475 free(ust_cmd);
476 }
477
478error_testpoint:
479 DBG("Dispatch thread dying");
480 if (err) {
481 health_error();
482 ERR("Health error occurred in %s", __func__);
483 }
484 health_unregister(health_sessiond);
485 rcu_unregister_thread();
486 return NULL;
487}
488
489static bool shutdown_ust_dispatch_thread(void *data)
490{
491 struct thread_notifiers *notifiers = data;
492
493 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
494 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
495 return true;
496}
497
498bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
499 int apps_cmd_pipe_write_fd,
500 int apps_cmd_notify_pipe_write_fd)
501{
502 struct lttng_thread *thread;
503 struct thread_notifiers *notifiers;
504
505 notifiers = zmalloc(sizeof(*notifiers));
506 if (!notifiers) {
507 goto error;
508 }
509 notifiers->ust_cmd_queue = cmd_queue;
510 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
511 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
512
513 thread = lttng_thread_create("UST registration dispatch",
514 thread_dispatch_ust_registration,
515 shutdown_ust_dispatch_thread,
516 cleanup_ust_dispatch_thread,
517 notifiers);
518 if (!thread) {
519 goto error;
520 }
521 lttng_thread_put(thread);
522 return true;
523error:
524 free(notifiers);
525 return false;
526}
This page took 0.05483 seconds and 5 git commands to generate.