SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
1 /*
2 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include <stddef.h>
11 #include <stdlib.h>
12 #include <urcu.h>
13 #include <common/futex.h>
14 #include <common/macros.h>
15
16 #include "dispatch.h"
17 #include "ust-app.h"
18 #include "testpoint.h"
19 #include "fd-limit.h"
20 #include "health-sessiond.h"
21 #include "lttng-sessiond.h"
22 #include "thread.h"
23
24 struct thread_notifiers {
25 struct ust_cmd_queue *ust_cmd_queue;
26 int apps_cmd_pipe_write_fd;
27 int apps_cmd_notify_pipe_write_fd;
28 int dispatch_thread_exit;
29 };
30
31 /*
32 * For each tracing session, update newly registered apps. The session list
33 * lock MUST be acquired before calling this.
34 */
35 static void update_ust_app(int app_sock)
36 {
37 struct ltt_session *sess, *stmp;
38 const struct ltt_session_list *session_list = session_get_list();
39 struct ust_app *app;
40
41 /* Consumer is in an ERROR state. Stop any application update. */
42 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
43 /* Stop the update process since the consumer is dead. */
44 return;
45 }
46
47 rcu_read_lock();
48 assert(app_sock >= 0);
49 app = ust_app_find_by_sock(app_sock);
50 if (app == NULL) {
51 /*
52 * Application can be unregistered before so
53 * this is possible hence simply stopping the
54 * update.
55 */
56 DBG3("UST app update failed to find app sock %d",
57 app_sock);
58 goto unlock_rcu;
59 }
60
61 /* Update all tokens for the app */
62 ust_app_global_update_tokens(app);
63
64 /* For all tracing session(s) */
65 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
66 if (!session_get(sess)) {
67 continue;
68 }
69 session_lock(sess);
70 if (!sess->active || !sess->ust_session) {
71 goto unlock_session;
72 }
73
74 ust_app_global_update(sess->ust_session, app);
75 unlock_session:
76 session_unlock(sess);
77 session_put(sess);
78 }
79
80 unlock_rcu:
81 rcu_read_unlock();
82 }
83
84 /*
85 * Sanitize the wait queue of the dispatch registration thread meaning removing
86 * invalid nodes from it. This is to avoid memory leaks for the case the UST
87 * notify socket is never received.
88 */
89 static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
90 {
91 int ret, nb_fd = 0, i;
92 unsigned int fd_added = 0;
93 struct lttng_poll_event events;
94 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
95
96 assert(wait_queue);
97
98 lttng_poll_init(&events);
99
100 /* Just skip everything for an empty queue. */
101 if (!wait_queue->count) {
102 goto end;
103 }
104
105 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
106 if (ret < 0) {
107 goto error_create;
108 }
109
110 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
111 &wait_queue->head, head) {
112 assert(wait_node->app);
113 ret = lttng_poll_add(&events, wait_node->app->sock,
114 LPOLLHUP | LPOLLERR);
115 if (ret < 0) {
116 goto error;
117 }
118
119 fd_added = 1;
120 }
121
122 if (!fd_added) {
123 goto end;
124 }
125
126 /*
127 * Poll but don't block so we can quickly identify the faulty events and
128 * clean them afterwards from the wait queue.
129 */
130 ret = lttng_poll_wait(&events, 0);
131 if (ret < 0) {
132 goto error;
133 }
134 nb_fd = ret;
135
136 for (i = 0; i < nb_fd; i++) {
137 /* Get faulty FD. */
138 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
139 int pollfd = LTTNG_POLL_GETFD(&events, i);
140
141 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
142 &wait_queue->head, head) {
143 if (pollfd == wait_node->app->sock &&
144 (revents & (LPOLLHUP | LPOLLERR))) {
145 cds_list_del(&wait_node->head);
146 wait_queue->count--;
147 ust_app_destroy(wait_node->app);
148 free(wait_node);
149 /*
150 * Silence warning of use-after-free in
151 * cds_list_for_each_entry_safe which uses
152 * __typeof__(*wait_node).
153 */
154 wait_node = NULL;
155 break;
156 } else {
157 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
158 goto error;
159 }
160 }
161 }
162
163 if (nb_fd > 0) {
164 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
165 }
166
167 end:
168 lttng_poll_clean(&events);
169 return;
170
171 error:
172 lttng_poll_clean(&events);
173 error_create:
174 ERR("Unable to sanitize wait queue");
175 return;
176 }
177
178 /*
179 * Send a socket to a thread This is called from the dispatch UST registration
180 * thread once all sockets are set for the application.
181 *
182 * The sock value can be invalid, we don't really care, the thread will handle
183 * it and make the necessary cleanup if so.
184 *
185 * On success, return 0 else a negative value being the errno message of the
186 * write().
187 */
188 static int send_socket_to_thread(int fd, int sock)
189 {
190 ssize_t ret;
191
192 /*
193 * It's possible that the FD is set as invalid with -1 concurrently just
194 * before calling this function being a shutdown state of the thread.
195 */
196 if (fd < 0) {
197 ret = -EBADF;
198 goto error;
199 }
200
201 ret = lttng_write(fd, &sock, sizeof(sock));
202 if (ret < sizeof(sock)) {
203 PERROR("write apps pipe %d", fd);
204 if (ret < 0) {
205 ret = -errno;
206 }
207 goto error;
208 }
209
210 /* All good. Don't send back the write positive ret value. */
211 ret = 0;
212 error:
213 return (int) ret;
214 }
215
216 static void cleanup_ust_dispatch_thread(void *data)
217 {
218 free(data);
219 }
220
221 /*
222 * Dispatch request from the registration threads to the application
223 * communication thread.
224 */
225 static void *thread_dispatch_ust_registration(void *data)
226 {
227 int ret, err = -1;
228 struct cds_wfcq_node *node;
229 struct ust_command *ust_cmd = NULL;
230 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
231 struct ust_reg_wait_queue wait_queue = {
232 .count = 0,
233 };
234 struct thread_notifiers *notifiers = data;
235
236 rcu_register_thread();
237
238 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
239
240 if (testpoint(sessiond_thread_app_reg_dispatch)) {
241 goto error_testpoint;
242 }
243
244 health_code_update();
245
246 CDS_INIT_LIST_HEAD(&wait_queue.head);
247
248 DBG("[thread] Dispatch UST command started");
249
250 for (;;) {
251 health_code_update();
252
253 /* Atomically prepare the queue futex */
254 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
255
256 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
257 break;
258 }
259
260 do {
261 struct ust_app *app = NULL;
262 ust_cmd = NULL;
263
264 /*
265 * Make sure we don't have node(s) that have hung up before receiving
266 * the notify socket. This is to clean the list in order to avoid
267 * memory leaks from notify socket that are never seen.
268 */
269 sanitize_wait_queue(&wait_queue);
270
271 health_code_update();
272 /* Dequeue command for registration */
273 node = cds_wfcq_dequeue_blocking(
274 &notifiers->ust_cmd_queue->head,
275 &notifiers->ust_cmd_queue->tail);
276 if (node == NULL) {
277 DBG("Woken up but nothing in the UST command queue");
278 /* Continue thread execution */
279 break;
280 }
281
282 ust_cmd = caa_container_of(node, struct ust_command, node);
283
284 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
285 " gid:%d sock:%d name:%s (version %d.%d)",
286 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
287 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
288 ust_cmd->sock, ust_cmd->reg_msg.name,
289 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
290
291 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
292 wait_node = zmalloc(sizeof(*wait_node));
293 if (!wait_node) {
294 PERROR("zmalloc wait_node dispatch");
295 ret = close(ust_cmd->sock);
296 if (ret < 0) {
297 PERROR("close ust sock dispatch %d", ust_cmd->sock);
298 }
299 lttng_fd_put(LTTNG_FD_APPS, 1);
300 free(ust_cmd);
301 ust_cmd = NULL;
302 goto error;
303 }
304 CDS_INIT_LIST_HEAD(&wait_node->head);
305
306 /* Create application object if socket is CMD. */
307 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
308 ust_cmd->sock);
309 if (!wait_node->app) {
310 ret = close(ust_cmd->sock);
311 if (ret < 0) {
312 PERROR("close ust sock dispatch %d", ust_cmd->sock);
313 }
314 lttng_fd_put(LTTNG_FD_APPS, 1);
315 free(wait_node);
316 wait_node = NULL;
317 free(ust_cmd);
318 ust_cmd = NULL;
319 continue;
320 }
321 /*
322 * Add application to the wait queue so we can set the notify
323 * socket before putting this object in the global ht.
324 */
325 cds_list_add(&wait_node->head, &wait_queue.head);
326 wait_queue.count++;
327
328 free(ust_cmd);
329 ust_cmd = NULL;
330 /*
331 * We have to continue here since we don't have the notify
332 * socket and the application MUST be added to the hash table
333 * only at that moment.
334 */
335 continue;
336 } else {
337 /*
338 * Look for the application in the local wait queue and set the
339 * notify socket if found.
340 */
341 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
342 &wait_queue.head, head) {
343 health_code_update();
344 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
345 wait_node->app->notify_sock = ust_cmd->sock;
346 cds_list_del(&wait_node->head);
347 wait_queue.count--;
348 app = wait_node->app;
349 free(wait_node);
350 wait_node = NULL;
351 DBG3("UST app notify socket %d is set", ust_cmd->sock);
352 break;
353 }
354 }
355
356 /*
357 * With no application at this stage the received socket is
358 * basically useless so close it before we free the cmd data
359 * structure for good.
360 */
361 if (!app) {
362 ret = close(ust_cmd->sock);
363 if (ret < 0) {
364 PERROR("close ust sock dispatch %d", ust_cmd->sock);
365 }
366 lttng_fd_put(LTTNG_FD_APPS, 1);
367 }
368 free(ust_cmd);
369 ust_cmd = NULL;
370 }
371
372 if (app) {
373 /*
374 * @session_lock_list
375 *
376 * Lock the global session list so from the register up to the
377 * registration done message, no thread can see the application
378 * and change its state.
379 */
380 session_lock_list();
381 rcu_read_lock();
382
383 /*
384 * Add application to the global hash table. This needs to be
385 * done before the update to the UST registry can locate the
386 * application.
387 */
388 ust_app_add(app);
389
390 /* Set app version. This call will print an error if needed. */
391 (void) ust_app_version(app);
392
393 (void) ust_app_setup_trigger_group(app);
394
395 /* Send notify socket through the notify pipe. */
396 ret = send_socket_to_thread(
397 notifiers->apps_cmd_notify_pipe_write_fd,
398 app->notify_sock);
399 if (ret < 0) {
400 rcu_read_unlock();
401 session_unlock_list();
402 /*
403 * No notify thread, stop the UST tracing. However, this is
404 * not an internal error of the this thread thus setting
405 * the health error code to a normal exit.
406 */
407 err = 0;
408 goto error;
409 }
410
411 /*
412 * Update newly registered application with the tracing
413 * registry info already enabled information.
414 */
415 update_ust_app(app->sock);
416
417 /*
418 * Don't care about return value. Let the manage apps threads
419 * handle app unregistration upon socket close.
420 */
421 (void) ust_app_register_done(app);
422
423 /*
424 * Even if the application socket has been closed, send the app
425 * to the thread and unregistration will take place at that
426 * place.
427 */
428 ret = send_socket_to_thread(
429 notifiers->apps_cmd_pipe_write_fd,
430 app->sock);
431 if (ret < 0) {
432 rcu_read_unlock();
433 session_unlock_list();
434 /*
435 * No apps. thread, stop the UST tracing. However, this is
436 * not an internal error of the this thread thus setting
437 * the health error code to a normal exit.
438 */
439 err = 0;
440 goto error;
441 }
442
443 rcu_read_unlock();
444 session_unlock_list();
445 }
446 } while (node != NULL);
447
448 health_poll_entry();
449 /* Futex wait on queue. Blocking call on futex() */
450 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
451 health_poll_exit();
452 }
453 /* Normal exit, no error */
454 err = 0;
455
456 error:
457 /* Clean up wait queue. */
458 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
459 &wait_queue.head, head) {
460 cds_list_del(&wait_node->head);
461 wait_queue.count--;
462 free(wait_node);
463 }
464
465 /* Empty command queue. */
466 for (;;) {
467 /* Dequeue command for registration */
468 node = cds_wfcq_dequeue_blocking(
469 &notifiers->ust_cmd_queue->head,
470 &notifiers->ust_cmd_queue->tail);
471 if (node == NULL) {
472 break;
473 }
474 ust_cmd = caa_container_of(node, struct ust_command, node);
475 ret = close(ust_cmd->sock);
476 if (ret < 0) {
477 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
478 }
479 lttng_fd_put(LTTNG_FD_APPS, 1);
480 free(ust_cmd);
481 }
482
483 error_testpoint:
484 DBG("Dispatch thread dying");
485 if (err) {
486 health_error();
487 ERR("Health error occurred in %s", __func__);
488 }
489 health_unregister(health_sessiond);
490 rcu_unregister_thread();
491 return NULL;
492 }
493
494 static bool shutdown_ust_dispatch_thread(void *data)
495 {
496 struct thread_notifiers *notifiers = data;
497
498 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
499 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
500 return true;
501 }
502
503 bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
504 int apps_cmd_pipe_write_fd,
505 int apps_cmd_notify_pipe_write_fd)
506 {
507 struct lttng_thread *thread;
508 struct thread_notifiers *notifiers;
509
510 notifiers = zmalloc(sizeof(*notifiers));
511 if (!notifiers) {
512 goto error;
513 }
514 notifiers->ust_cmd_queue = cmd_queue;
515 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
516 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
517
518 thread = lttng_thread_create("UST registration dispatch",
519 thread_dispatch_ust_registration,
520 shutdown_ust_dispatch_thread,
521 cleanup_ust_dispatch_thread,
522 notifiers);
523 if (!thread) {
524 goto error;
525 }
526 lttng_thread_put(thread);
527 return true;
528 error:
529 free(notifiers);
530 return false;
531 }
This page took 0.064666 seconds and 5 git commands to generate.