Custom upgrade: allow N lttng-ust instances per process
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
CommitLineData
5d1b0219 1/*
90c106c6 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
10#include <stddef.h>
11#include <stdlib.h>
12#include <urcu.h>
13#include <common/futex.h>
14#include <common/macros.h>
15
16#include "dispatch.h"
17#include "ust-app.h"
18#include "testpoint.h"
19#include "fd-limit.h"
20#include "health-sessiond.h"
21#include "lttng-sessiond.h"
22#include "thread.h"
23
24struct thread_notifiers {
25 struct ust_cmd_queue *ust_cmd_queue;
26 int apps_cmd_pipe_write_fd;
27 int apps_cmd_notify_pipe_write_fd;
28 int dispatch_thread_exit;
29};
30
31/*
32 * For each tracing session, update newly registered apps. The session list
33 * lock MUST be acquired before calling this.
34 */
35static void update_ust_app(int app_sock)
36{
37 struct ltt_session *sess, *stmp;
38 const struct ltt_session_list *session_list = session_get_list();
70670472 39 struct ust_app *app;
5d1b0219
JG
40
41 /* Consumer is in an ERROR state. Stop any application update. */
412d7227 42 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
5d1b0219
JG
43 /* Stop the update process since the consumer is dead. */
44 return;
45 }
46
70670472
JR
47 rcu_read_lock();
48 assert(app_sock >= 0);
49 app = ust_app_find_by_sock(app_sock);
50 if (app == NULL) {
51 /*
52 * Application can be unregistered before so
53 * this is possible hence simply stopping the
54 * update.
55 */
56 DBG3("UST app update failed to find app sock %d",
57 app_sock);
58 goto unlock_rcu;
59 }
60
61 /* Update all event notifiers for the app. */
62 ust_app_global_update_event_notifier_rules(app);
63
5d1b0219
JG
64 /* For all tracing session(s) */
65 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
5d1b0219
JG
66 if (!session_get(sess)) {
67 continue;
68 }
69 session_lock(sess);
c05d4d1f
JR
70 if (!sess->active || !sess->ust_session ||
71 !sess->ust_session->active) {
5d1b0219
JG
72 goto unlock_session;
73 }
74
5d1b0219 75 ust_app_global_update(sess->ust_session, app);
5d1b0219
JG
76 unlock_session:
77 session_unlock(sess);
78 session_put(sess);
79 }
70670472
JR
80
81unlock_rcu:
82 rcu_read_unlock();
5d1b0219
JG
83}
84
85/*
86 * Sanitize the wait queue of the dispatch registration thread meaning removing
87 * invalid nodes from it. This is to avoid memory leaks for the case the UST
88 * notify socket is never received.
89 */
90static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
91{
92 int ret, nb_fd = 0, i;
93 unsigned int fd_added = 0;
94 struct lttng_poll_event events;
95 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
96
97 assert(wait_queue);
98
99 lttng_poll_init(&events);
100
101 /* Just skip everything for an empty queue. */
102 if (!wait_queue->count) {
103 goto end;
104 }
105
106 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
107 if (ret < 0) {
108 goto error_create;
109 }
110
111 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
112 &wait_queue->head, head) {
113 assert(wait_node->app);
114 ret = lttng_poll_add(&events, wait_node->app->sock,
115 LPOLLHUP | LPOLLERR);
116 if (ret < 0) {
117 goto error;
118 }
119
120 fd_added = 1;
121 }
122
123 if (!fd_added) {
124 goto end;
125 }
126
127 /*
128 * Poll but don't block so we can quickly identify the faulty events and
129 * clean them afterwards from the wait queue.
130 */
131 ret = lttng_poll_wait(&events, 0);
132 if (ret < 0) {
133 goto error;
134 }
135 nb_fd = ret;
136
137 for (i = 0; i < nb_fd; i++) {
138 /* Get faulty FD. */
139 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
140 int pollfd = LTTNG_POLL_GETFD(&events, i);
141
5d1b0219
JG
142 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
143 &wait_queue->head, head) {
144 if (pollfd == wait_node->app->sock &&
145 (revents & (LPOLLHUP | LPOLLERR))) {
146 cds_list_del(&wait_node->head);
147 wait_queue->count--;
148 ust_app_destroy(wait_node->app);
149 free(wait_node);
150 /*
151 * Silence warning of use-after-free in
152 * cds_list_for_each_entry_safe which uses
153 * __typeof__(*wait_node).
154 */
155 wait_node = NULL;
156 break;
157 } else {
158 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
159 goto error;
160 }
161 }
162 }
163
164 if (nb_fd > 0) {
165 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
166 }
167
168end:
169 lttng_poll_clean(&events);
170 return;
171
172error:
173 lttng_poll_clean(&events);
174error_create:
175 ERR("Unable to sanitize wait queue");
176 return;
177}
178
179/*
180 * Send a socket to a thread This is called from the dispatch UST registration
181 * thread once all sockets are set for the application.
182 *
183 * The sock value can be invalid, we don't really care, the thread will handle
184 * it and make the necessary cleanup if so.
185 *
186 * On success, return 0 else a negative value being the errno message of the
187 * write().
188 */
189static int send_socket_to_thread(int fd, int sock)
190{
191 ssize_t ret;
192
193 /*
194 * It's possible that the FD is set as invalid with -1 concurrently just
195 * before calling this function being a shutdown state of the thread.
196 */
197 if (fd < 0) {
198 ret = -EBADF;
199 goto error;
200 }
201
202 ret = lttng_write(fd, &sock, sizeof(sock));
203 if (ret < sizeof(sock)) {
204 PERROR("write apps pipe %d", fd);
205 if (ret < 0) {
206 ret = -errno;
207 }
208 goto error;
209 }
210
211 /* All good. Don't send back the write positive ret value. */
212 ret = 0;
213error:
214 return (int) ret;
215}
216
217static void cleanup_ust_dispatch_thread(void *data)
218{
219 free(data);
220}
221
222/*
223 * Dispatch request from the registration threads to the application
224 * communication thread.
225 */
226static void *thread_dispatch_ust_registration(void *data)
227{
228 int ret, err = -1;
229 struct cds_wfcq_node *node;
230 struct ust_command *ust_cmd = NULL;
231 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
232 struct ust_reg_wait_queue wait_queue = {
233 .count = 0,
234 };
235 struct thread_notifiers *notifiers = data;
236
237 rcu_register_thread();
238
412d7227
SM
239 health_register(the_health_sessiond,
240 HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
5d1b0219
JG
241
242 if (testpoint(sessiond_thread_app_reg_dispatch)) {
243 goto error_testpoint;
244 }
245
246 health_code_update();
247
248 CDS_INIT_LIST_HEAD(&wait_queue.head);
249
250 DBG("[thread] Dispatch UST command started");
251
252 for (;;) {
253 health_code_update();
254
255 /* Atomically prepare the queue futex */
256 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
257
258 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
259 break;
260 }
261
262 do {
263 struct ust_app *app = NULL;
264 ust_cmd = NULL;
265
266 /*
267 * Make sure we don't have node(s) that have hung up before receiving
268 * the notify socket. This is to clean the list in order to avoid
269 * memory leaks from notify socket that are never seen.
270 */
271 sanitize_wait_queue(&wait_queue);
272
273 health_code_update();
274 /* Dequeue command for registration */
275 node = cds_wfcq_dequeue_blocking(
276 &notifiers->ust_cmd_queue->head,
277 &notifiers->ust_cmd_queue->tail);
278 if (node == NULL) {
279 DBG("Woken up but nothing in the UST command queue");
280 /* Continue thread execution */
281 break;
282 }
283
284 ust_cmd = caa_container_of(node, struct ust_command, node);
285
286 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
287 " gid:%d sock:%d name:%s (version %d.%d)",
288 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
289 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
290 ust_cmd->sock, ust_cmd->reg_msg.name,
291 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
292
b623cb6a 293 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
5d1b0219
JG
294 wait_node = zmalloc(sizeof(*wait_node));
295 if (!wait_node) {
296 PERROR("zmalloc wait_node dispatch");
297 ret = close(ust_cmd->sock);
298 if (ret < 0) {
299 PERROR("close ust sock dispatch %d", ust_cmd->sock);
300 }
301 lttng_fd_put(LTTNG_FD_APPS, 1);
302 free(ust_cmd);
58f835e1 303 ust_cmd = NULL;
5d1b0219
JG
304 goto error;
305 }
306 CDS_INIT_LIST_HEAD(&wait_node->head);
307
308 /* Create application object if socket is CMD. */
309 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
310 ust_cmd->sock);
311 if (!wait_node->app) {
312 ret = close(ust_cmd->sock);
313 if (ret < 0) {
314 PERROR("close ust sock dispatch %d", ust_cmd->sock);
315 }
316 lttng_fd_put(LTTNG_FD_APPS, 1);
317 free(wait_node);
58f835e1 318 wait_node = NULL;
5d1b0219 319 free(ust_cmd);
58f835e1 320 ust_cmd = NULL;
5d1b0219
JG
321 continue;
322 }
323 /*
324 * Add application to the wait queue so we can set the notify
325 * socket before putting this object in the global ht.
326 */
327 cds_list_add(&wait_node->head, &wait_queue.head);
328 wait_queue.count++;
329
330 free(ust_cmd);
58f835e1 331 ust_cmd = NULL;
5d1b0219
JG
332 /*
333 * We have to continue here since we don't have the notify
334 * socket and the application MUST be added to the hash table
335 * only at that moment.
336 */
337 continue;
338 } else {
339 /*
340 * Look for the application in the local wait queue and set the
341 * notify socket if found.
342 */
343 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
344 &wait_queue.head, head) {
345 health_code_update();
0b6b8118
JR
346 /*
347 * Only using the pid number to match
348 * notify socket is not enough when
349 * dealing with multiple instances of
350 * lttng-ust for a given process. A
351 * similar problem will rise up when
352 * dealing with multiple similar pid
353 * across namespaces in the futur.
354 */
355 if (wait_node->app->pid ==
356 ust_cmd->reg_msg.pid) {
357 if ((wait_node->app->v_major !=
358 ust_cmd->reg_msg.major) &&
359 (wait_node->app->v_minor !=
360 ust_cmd->reg_msg.minor)) {
361 DBG("Skipping notify socket assignment (pid: %d) based on version mismatch. Notify socket registration version %d.%d, under iteration app version %d.%d.",
362 wait_node->app->pid,
363 ust_cmd->reg_msg.major,
364 ust_cmd->reg_msg.minor,
365 wait_node->app->v_major,
366 wait_node->app->v_minor);
367 continue;
368 }
5d1b0219
JG
369 wait_node->app->notify_sock = ust_cmd->sock;
370 cds_list_del(&wait_node->head);
371 wait_queue.count--;
372 app = wait_node->app;
373 free(wait_node);
58f835e1 374 wait_node = NULL;
0b6b8118 375 DBG3("UST app notify socket %d is set for app sock %d", ust_cmd->sock, app->sock);
5d1b0219
JG
376 break;
377 }
378 }
379
380 /*
381 * With no application at this stage the received socket is
382 * basically useless so close it before we free the cmd data
383 * structure for good.
384 */
385 if (!app) {
386 ret = close(ust_cmd->sock);
387 if (ret < 0) {
388 PERROR("close ust sock dispatch %d", ust_cmd->sock);
389 }
390 lttng_fd_put(LTTNG_FD_APPS, 1);
391 }
392 free(ust_cmd);
58f835e1 393 ust_cmd = NULL;
5d1b0219
JG
394 }
395
396 if (app) {
397 /*
398 * @session_lock_list
399 *
400 * Lock the global session list so from the register up to the
401 * registration done message, no thread can see the application
402 * and change its state.
403 */
404 session_lock_list();
405 rcu_read_lock();
406
407 /*
408 * Add application to the global hash table. This needs to be
409 * done before the update to the UST registry can locate the
410 * application.
411 */
412 ust_app_add(app);
413
414 /* Set app version. This call will print an error if needed. */
415 (void) ust_app_version(app);
416
da873412
JR
417 (void) ust_app_setup_event_notifier_group(app);
418
5d1b0219
JG
419 /* Send notify socket through the notify pipe. */
420 ret = send_socket_to_thread(
421 notifiers->apps_cmd_notify_pipe_write_fd,
422 app->notify_sock);
423 if (ret < 0) {
424 rcu_read_unlock();
425 session_unlock_list();
426 /*
427 * No notify thread, stop the UST tracing. However, this is
428 * not an internal error of the this thread thus setting
429 * the health error code to a normal exit.
430 */
431 err = 0;
432 goto error;
433 }
434
435 /*
436 * Update newly registered application with the tracing
437 * registry info already enabled information.
438 */
439 update_ust_app(app->sock);
440
441 /*
442 * Don't care about return value. Let the manage apps threads
443 * handle app unregistration upon socket close.
444 */
445 (void) ust_app_register_done(app);
446
447 /*
448 * Even if the application socket has been closed, send the app
449 * to the thread and unregistration will take place at that
450 * place.
451 */
452 ret = send_socket_to_thread(
453 notifiers->apps_cmd_pipe_write_fd,
454 app->sock);
455 if (ret < 0) {
456 rcu_read_unlock();
457 session_unlock_list();
458 /*
459 * No apps. thread, stop the UST tracing. However, this is
460 * not an internal error of the this thread thus setting
461 * the health error code to a normal exit.
462 */
463 err = 0;
464 goto error;
465 }
466
467 rcu_read_unlock();
468 session_unlock_list();
469 }
470 } while (node != NULL);
471
472 health_poll_entry();
473 /* Futex wait on queue. Blocking call on futex() */
474 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
475 health_poll_exit();
476 }
477 /* Normal exit, no error */
478 err = 0;
479
480error:
481 /* Clean up wait queue. */
482 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
483 &wait_queue.head, head) {
484 cds_list_del(&wait_node->head);
485 wait_queue.count--;
486 free(wait_node);
487 }
488
489 /* Empty command queue. */
490 for (;;) {
491 /* Dequeue command for registration */
492 node = cds_wfcq_dequeue_blocking(
493 &notifiers->ust_cmd_queue->head,
494 &notifiers->ust_cmd_queue->tail);
495 if (node == NULL) {
496 break;
497 }
498 ust_cmd = caa_container_of(node, struct ust_command, node);
499 ret = close(ust_cmd->sock);
500 if (ret < 0) {
501 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
502 }
503 lttng_fd_put(LTTNG_FD_APPS, 1);
504 free(ust_cmd);
505 }
506
507error_testpoint:
508 DBG("Dispatch thread dying");
509 if (err) {
510 health_error();
511 ERR("Health error occurred in %s", __func__);
512 }
412d7227 513 health_unregister(the_health_sessiond);
5d1b0219
JG
514 rcu_unregister_thread();
515 return NULL;
516}
517
518static bool shutdown_ust_dispatch_thread(void *data)
519{
520 struct thread_notifiers *notifiers = data;
521
522 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
523 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
524 return true;
525}
526
527bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
528 int apps_cmd_pipe_write_fd,
529 int apps_cmd_notify_pipe_write_fd)
530{
531 struct lttng_thread *thread;
532 struct thread_notifiers *notifiers;
533
534 notifiers = zmalloc(sizeof(*notifiers));
535 if (!notifiers) {
536 goto error;
537 }
538 notifiers->ust_cmd_queue = cmd_queue;
539 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
540 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
541
542 thread = lttng_thread_create("UST registration dispatch",
543 thread_dispatch_ust_registration,
544 shutdown_ust_dispatch_thread,
545 cleanup_ust_dispatch_thread,
546 notifiers);
547 if (!thread) {
548 goto error;
549 }
550 lttng_thread_put(thread);
551 return true;
552error:
553 free(notifiers);
554 return false;
555}
This page took 0.069578 seconds and 5 git commands to generate.