SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.h"
9 #include "cmd.h"
10 #include "health-sessiond.h"
11 #include "lttng-sessiond.h"
12 #include "notification-thread-internal.h"
13 #include "session.h"
14 #include "thread.h"
15 #include <common/macros.h>
16 #include <lttng/action/group.h>
17 #include <lttng/action/notify.h>
18 #include <lttng/action/rotate-session.h>
19 #include <lttng/action/snapshot-session.h>
20 #include <lttng/action/start-session.h>
21 #include <lttng/action/stop-session.h>
22 #include <lttng/condition/evaluation.h>
23 #include <lttng/condition/event-rule-internal.h>
24 #include <lttng/lttng-error.h>
25 #include <lttng/trigger/trigger-internal.h>
26 #include <pthread.h>
27 #include <stdbool.h>
28 #include <stddef.h>
29 #include <urcu/list.h>
30
31 #define THREAD_NAME "Action Executor"
32 #define MAX_QUEUED_WORK_COUNT 8192
33
34 struct action_work_item {
35 uint64_t id;
36 struct lttng_trigger *trigger;
37 struct notification_client_list *client_list;
38 struct cds_list_head list_node;
39 };
40
41 struct action_executor {
42 struct lttng_thread *thread;
43 struct notification_thread_handle *notification_thread_handle;
44 struct {
45 uint64_t pending_count;
46 struct cds_list_head list;
47 pthread_cond_t cond;
48 pthread_mutex_t lock;
49 } work;
50 bool should_quit;
51 uint64_t next_work_item_id;
52 };
53
54 typedef int (*action_executor_handler)(struct action_executor *executor,
55 const struct action_work_item *,
56 const struct lttng_action *action);
57
58 static int action_executor_notify_handler(struct action_executor *executor,
59 const struct action_work_item *,
60 const struct lttng_action *);
61 static int action_executor_start_session_handler(struct action_executor *executor,
62 const struct action_work_item *,
63 const struct lttng_action *);
64 static int action_executor_stop_session_handler(struct action_executor *executor,
65 const struct action_work_item *,
66 const struct lttng_action *);
67 static int action_executor_rotate_session_handler(struct action_executor *executor,
68 const struct action_work_item *,
69 const struct lttng_action *);
70 static int action_executor_snapshot_session_handler(struct action_executor *executor,
71 const struct action_work_item *,
72 const struct lttng_action *);
73 static int action_executor_group_handler(struct action_executor *executor,
74 const struct action_work_item *,
75 const struct lttng_action *);
76 static int action_executor_generic_handler(struct action_executor *executor,
77 const struct action_work_item *,
78 const struct lttng_action *);
79
80 static const action_executor_handler action_executors[] = {
81 [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
82 [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
83 [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
84 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
85 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
86 [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
87 };
88
89 static const char *get_action_name(const struct lttng_action *action)
90 {
91 const char *action_type_names[] = {
92 [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
93 [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
94 [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
95 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
96 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
97 [LTTNG_ACTION_TYPE_GROUP] = "Group",
98 };
99
100 return action_type_names[lttng_action_get_type(action)];
101 }
102
103 static const char *get_trigger_name(const struct lttng_trigger *trigger)
104 {
105 const char *trigger_name;
106 enum lttng_trigger_status trigger_status;
107
108 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
109 assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
110
111 return trigger_name;
112 }
113
114 static int client_handle_transmission_status(
115 struct notification_client *client,
116 enum client_transmission_status status,
117 void *user_data)
118 {
119 int ret = 0;
120 struct action_executor *executor = user_data;
121 bool update_communication = true;
122
123 ASSERT_LOCKED(client->lock);
124
125 switch (status) {
126 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
127 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
128 client->id);
129 update_communication = false;
130 break;
131 case CLIENT_TRANSMISSION_STATUS_QUEUED:
132 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
133 client->id);
134 break;
135 case CLIENT_TRANSMISSION_STATUS_FAIL:
136 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
137 client->id);
138 client->communication.active = false;
139 break;
140 default:
141 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
142 client->id);
143 client->communication.active = false;
144 ret = -1;
145 goto end;
146 }
147
148 if (!update_communication) {
149 goto end;
150 }
151
152 ret = notification_thread_client_communication_update(
153 executor->notification_thread_handle, client->id,
154 status);
155 end:
156 return ret;
157 }
158
159 static int action_executor_notify_handler(struct action_executor *executor,
160 const struct action_work_item *work_item,
161 const struct lttng_action *action)
162 {
163 int ret = 0;
164 struct lttng_evaluation *evaluation = NULL;
165
166 assert(work_item->client_list);
167
168 evaluation = lttng_evaluation_event_rule_create(
169 get_trigger_name(work_item->trigger));
170 if (!evaluation) {
171 ERR("Failed to create event rule hit evaluation");
172 ret = -1;
173 goto end;
174 }
175
176 ret = notification_client_list_send_evaluation(work_item->client_list,
177 lttng_trigger_get_const_condition(work_item->trigger),
178 evaluation,
179 lttng_trigger_get_credentials(work_item->trigger), NULL,
180 client_handle_transmission_status, executor);
181 end:
182 lttng_evaluation_destroy(evaluation);
183 return ret;
184 }
185
186 static int action_executor_start_session_handler(struct action_executor *executor,
187 const struct action_work_item *work_item,
188 const struct lttng_action *action)
189 {
190 int ret = 0;
191 const char *session_name;
192 enum lttng_action_status action_status;
193 struct ltt_session *session;
194
195 action_status = lttng_action_start_session_get_session_name(
196 action, &session_name);
197 if (action_status != LTTNG_ACTION_STATUS_OK) {
198 ERR("Failed to get session name from \"%s\" action",
199 get_action_name(action));
200 ret = -1;
201 goto end;
202 }
203
204 session_lock_list();
205 session = session_find_by_name(session_name);
206 if (session) {
207 enum lttng_error_code cmd_ret;
208
209 session_lock(session);
210 cmd_ret = cmd_start_trace(session);
211 session_unlock(session);
212
213 switch (cmd_ret) {
214 case LTTNG_OK:
215 DBG("Successfully started session \"%s\" on behalf of trigger \"%s\"",
216 session_name,
217 get_trigger_name(work_item->trigger));
218 break;
219 case LTTNG_ERR_TRACE_ALREADY_STARTED:
220 DBG("Attempted to start session \"%s\" on behalf of trigger \"%s\" but it was already started",
221 session_name,
222 get_trigger_name(work_item->trigger));
223 break;
224 default:
225 WARN("Failed to start session \"%s\" on behalf of trigger \"%s\": %s",
226 session_name,
227 get_trigger_name(work_item->trigger),
228 lttng_strerror(-cmd_ret));
229 break;
230 }
231 session_put(session);
232 } else {
233 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
234 session_name, get_action_name(action),
235 get_trigger_name(work_item->trigger));
236 }
237 session_unlock_list();
238 end:
239 return ret;
240 }
241
242 static int action_executor_stop_session_handler(struct action_executor *executor,
243 const struct action_work_item *work_item,
244 const struct lttng_action *action)
245 {
246 int ret = 0;
247 const char *session_name;
248 enum lttng_action_status action_status;
249 struct ltt_session *session;
250
251 action_status = lttng_action_stop_session_get_session_name(
252 action, &session_name);
253 if (action_status != LTTNG_ACTION_STATUS_OK) {
254 ERR("Failed to get session name from \"%s\" action",
255 get_action_name(action));
256 ret = -1;
257 goto end;
258 }
259
260 session_lock_list();
261 session = session_find_by_name(session_name);
262 if (session) {
263 enum lttng_error_code cmd_ret;
264
265 session_lock(session);
266 cmd_ret = cmd_stop_trace(session);
267 session_unlock(session);
268
269 switch (cmd_ret) {
270 case LTTNG_OK:
271 DBG("Successfully stopped session \"%s\" on behalf of trigger \"%s\"",
272 session_name,
273 get_trigger_name(work_item->trigger));
274 break;
275 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
276 DBG("Attempted to stop session \"%s\" on behalf of trigger \"%s\" but it was already stopped",
277 session_name,
278 get_trigger_name(work_item->trigger));
279 break;
280 default:
281 WARN("Failed to stop session \"%s\" on behalf of trigger \"%s\": %s",
282 session_name,
283 get_trigger_name(work_item->trigger),
284 lttng_strerror(-cmd_ret));
285 break;
286 }
287 session_put(session);
288 } else {
289 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
290 session_name, get_action_name(action),
291 get_trigger_name(work_item->trigger));
292 }
293 session_unlock_list();
294 end:
295 return ret;
296 }
297
298 static int action_executor_rotate_session_handler(struct action_executor *executor,
299 const struct action_work_item *work_item,
300 const struct lttng_action *action)
301 {
302 int ret = 0;
303 const char *session_name;
304 enum lttng_action_status action_status;
305 struct ltt_session *session;
306
307 action_status = lttng_action_rotate_session_get_session_name(
308 action, &session_name);
309 if (action_status != LTTNG_ACTION_STATUS_OK) {
310 ERR("Failed to get session name from \"%s\" action",
311 get_action_name(action));
312 ret = -1;
313 goto end;
314 }
315
316 session_lock_list();
317 session = session_find_by_name(session_name);
318 if (session) {
319 enum lttng_error_code cmd_ret;
320
321 session_lock(session);
322 cmd_ret = cmd_rotate_session(session, NULL, false,
323 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
324 session_unlock(session);
325
326 switch (cmd_ret) {
327 case LTTNG_OK:
328 DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%s\"",
329 session_name,
330 get_trigger_name(work_item->trigger));
331 break;
332 case LTTNG_ERR_ROTATION_PENDING:
333 DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation is already ongoing",
334 session_name,
335 get_trigger_name(work_item->trigger));
336 break;
337 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
338 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
339 DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation has already been completed since the last stop or clear",
340 session_name,
341 get_trigger_name(work_item->trigger));
342 break;
343 default:
344 WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%s\": %s",
345 session_name,
346 get_trigger_name(work_item->trigger),
347 lttng_strerror(-cmd_ret));
348 break;
349 }
350 session_put(session);
351 } else {
352 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
353 session_name, get_action_name(action),
354 get_trigger_name(work_item->trigger));
355 }
356 session_unlock_list();
357 end:
358 return ret;
359 }
360
361 static int action_executor_snapshot_session_handler(struct action_executor *executor,
362 const struct action_work_item *work_item,
363 const struct lttng_action *action)
364 {
365 int ret = 0;
366 const char *session_name;
367 enum lttng_action_status action_status;
368 struct ltt_session *session;
369 const struct lttng_snapshot_output default_snapshot_output = {
370 .max_size = UINT64_MAX,
371 };
372 const struct lttng_snapshot_output *snapshot_output =
373 &default_snapshot_output;
374
375 action_status = lttng_action_snapshot_session_get_session_name(
376 action, &session_name);
377 if (action_status != LTTNG_ACTION_STATUS_OK) {
378 ERR("Failed to get session name from \"%s\" action",
379 get_action_name(action));
380 ret = -1;
381 goto end;
382 }
383
384 action_status = lttng_action_snapshot_session_get_output_const(
385 action, &snapshot_output);
386 if (action_status != LTTNG_ACTION_STATUS_OK &&
387 action_status != LTTNG_ACTION_STATUS_UNSET) {
388 ERR("Failed to get output from \"%s\" action",
389 get_action_name(action));
390 ret = -1;
391 goto end;
392 }
393
394 session_lock_list();
395 session = session_find_by_name(session_name);
396 if (session) {
397 enum lttng_error_code cmd_ret;
398
399 session_lock(session);
400 cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
401 session_unlock(session);
402
403 switch (cmd_ret) {
404 case LTTNG_OK:
405 DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%s\"",
406 session_name,
407 get_trigger_name(work_item->trigger));
408 break;
409 default:
410 WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%s\": %s",
411 session_name,
412 get_trigger_name(work_item->trigger),
413 lttng_strerror(-cmd_ret));
414 break;
415 }
416 session_put(session);
417 } else {
418 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
419 session_name, get_action_name(action),
420 get_trigger_name(work_item->trigger));
421 }
422 session_unlock_list();
423 end:
424 return ret;
425 }
426
427 static int action_executor_group_handler(struct action_executor *executor,
428 const struct action_work_item *work_item,
429 const struct lttng_action *action_group)
430 {
431 int ret = 0;
432 unsigned int i, count;
433 enum lttng_action_status action_status;
434
435 action_status = lttng_action_group_get_count(action_group, &count);
436 if (action_status != LTTNG_ACTION_STATUS_OK) {
437 /* Fatal error. */
438 ERR("Failed to get count of action in action group");
439 ret = -1;
440 goto end;
441 }
442
443 DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
444 for (i = 0; i < count; i++) {
445 const struct lttng_action *action =
446 lttng_action_group_get_at_index_const(
447 action_group, i);
448
449 ret = action_executor_generic_handler(
450 executor, work_item, action);
451 if (ret) {
452 ERR("Stopping the execution of the action group of trigger \"%s\" following a fatal error",
453 get_trigger_name(work_item->trigger));
454 goto end;
455 }
456 }
457 end:
458 return ret;
459 }
460
461 static int action_executor_generic_handler(struct action_executor *executor,
462 const struct action_work_item *work_item,
463 const struct lttng_action *action)
464 {
465 DBG("Executing action \"%s\" of trigger \"%s\" action work item %" PRIu64,
466 get_action_name(action),
467 get_trigger_name(work_item->trigger),
468 work_item->id);
469
470 return action_executors[lttng_action_get_type(action)](
471 executor, work_item, action);
472 }
473
474 static int action_work_item_execute(struct action_executor *executor,
475 struct action_work_item *work_item)
476 {
477 int ret;
478 const struct lttng_action *action =
479 lttng_trigger_get_const_action(work_item->trigger);
480
481 DBG("Starting execution of action work item %" PRIu64 " of trigger \"%s\"",
482 work_item->id, get_trigger_name(work_item->trigger));
483 ret = action_executor_generic_handler(executor, work_item, action);
484 DBG("Completed execution of action work item %" PRIu64 " of trigger \"%s\"",
485 work_item->id, get_trigger_name(work_item->trigger));
486 return ret;
487 }
488
489 static void action_work_item_destroy(struct action_work_item *work_item)
490 {
491 lttng_trigger_put(work_item->trigger);
492 notification_client_list_put(work_item->client_list);
493 free(work_item);
494 }
495
496 static void *action_executor_thread(void *_data)
497 {
498 struct action_executor *executor = _data;
499
500 assert(executor);
501
502 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
503
504 rcu_register_thread();
505 rcu_thread_online();
506
507 DBG("Entering work execution loop");
508 pthread_mutex_lock(&executor->work.lock);
509 while (!executor->should_quit) {
510 int ret;
511 struct action_work_item *work_item;
512
513 health_code_update();
514 if (executor->work.pending_count == 0) {
515 health_poll_entry();
516 DBG("No work items enqueued, entering wait");
517 pthread_cond_wait(&executor->work.cond,
518 &executor->work.lock);
519 DBG("Woke-up from wait");
520 health_poll_exit();
521 continue;
522 }
523
524 /* Pop item from front of the listwith work lock held. */
525 work_item = cds_list_first_entry(&executor->work.list,
526 struct action_work_item, list_node);
527 cds_list_del(&work_item->list_node);
528 executor->work.pending_count--;
529
530 /*
531 * Work can be performed without holding the work lock,
532 * allowing new items to be queued.
533 */
534 pthread_mutex_unlock(&executor->work.lock);
535 ret = action_work_item_execute(executor, work_item);
536 action_work_item_destroy(work_item);
537 if (ret) {
538 /* Fatal error. */
539 break;
540 }
541 health_code_update();
542 pthread_mutex_lock(&executor->work.lock);
543 }
544 pthread_mutex_unlock(&executor->work.lock);
545 DBG("Left work execution loop");
546
547 health_code_update();
548
549 rcu_thread_offline();
550 rcu_unregister_thread();
551 health_unregister(health_sessiond);
552
553 return NULL;
554 }
555
556 static bool shutdown_action_executor_thread(void *_data)
557 {
558 struct action_executor *executor = _data;
559
560 /* TODO. */
561 executor->should_quit = true;
562 pthread_cond_signal(&executor->work.cond);
563 return true;
564 }
565
566 static void clean_up_action_executor_thread(void *_data)
567 {
568 struct action_executor *executor = _data;
569
570 assert(cds_list_empty(&executor->work.list));
571
572 pthread_mutex_destroy(&executor->work.lock);
573 pthread_cond_destroy(&executor->work.cond);
574 free(executor);
575 }
576
577 struct action_executor *action_executor_create(
578 struct notification_thread_handle *handle)
579 {
580 struct action_executor *executor = zmalloc(sizeof(*executor));
581
582 if (!executor) {
583 goto end;
584 }
585
586 CDS_INIT_LIST_HEAD(&executor->work.list);
587 pthread_cond_init(&executor->work.cond, NULL);
588 pthread_mutex_init(&executor->work.lock, NULL);
589 executor->notification_thread_handle = handle;
590
591 executor->thread = lttng_thread_create(THREAD_NAME,
592 action_executor_thread, shutdown_action_executor_thread,
593 clean_up_action_executor_thread, executor);
594 end:
595 return executor;
596 }
597
598 void action_executor_destroy(struct action_executor *executor)
599 {
600 struct action_work_item *work_item, *tmp;
601
602 /* TODO Wait for work list to drain? */
603 lttng_thread_shutdown(executor->thread);
604 pthread_mutex_lock(&executor->work.lock);
605 if (executor->work.pending_count != 0) {
606 WARN("%" PRIu64
607 " trigger action%s still queued for execution and will be discarded",
608 executor->work.pending_count,
609 executor->work.pending_count == 1 ? " is" :
610 "s are");
611 }
612
613 cds_list_for_each_entry_safe (
614 work_item, tmp, &executor->work.list, list_node) {
615 WARN("Discarding action work item %" PRIu64
616 " associated to trigger \"%s\"",
617 work_item->id, get_trigger_name(work_item->trigger));
618 cds_list_del(&work_item->list_node);
619 action_work_item_destroy(work_item);
620 }
621 pthread_mutex_unlock(&executor->work.lock);
622 lttng_thread_put(executor->thread);
623 }
624
625 /* RCU read-lock must be held by the caller. */
626 enum action_executor_status action_executor_enqueue(
627 struct action_executor *executor,
628 struct lttng_trigger *trigger,
629 struct notification_client_list *client_list)
630 {
631 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
632 const uint64_t work_item_id = executor->next_work_item_id++;
633 struct action_work_item *work_item;
634 bool signal = false;
635
636 pthread_mutex_lock(&executor->work.lock);
637 /* Check for queue overflow. */
638 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
639 /* Most likely spammy, remove if it is the case. */
640 DBG("Refusing to enqueue action for trigger \"%s\" as work item %" PRIu64
641 " (overflow)",
642 get_trigger_name(trigger), work_item_id);
643 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
644 goto error_unlock;
645 }
646
647 work_item = zmalloc(sizeof(*work_item));
648 if (!work_item) {
649 PERROR("Failed to allocate action executor work item on behalf of trigger \"%s\"",
650 get_trigger_name(trigger));
651 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
652 goto error_unlock;
653 }
654
655 lttng_trigger_get(trigger);
656 if (client_list) {
657 const bool reference_acquired =
658 notification_client_list_get(client_list);
659
660 assert(reference_acquired);
661 }
662
663 *work_item = (typeof(*work_item)){
664 .id = work_item_id,
665 .trigger = trigger,
666 .client_list = client_list,
667 .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
668 };
669 cds_list_add_tail(&work_item->list_node, &executor->work.list);
670 executor->work.pending_count++;
671 DBG("Enqueued action for trigger \"%s\" as work item %" PRIu64,
672 get_trigger_name(trigger), work_item_id);
673 signal = true;
674
675 error_unlock:
676 pthread_mutex_unlock(&executor->work.lock);
677 if (signal) {
678 pthread_cond_signal(&executor->work.cond);
679 }
680 return executor_status;
681 }
This page took 0.04423 seconds and 5 git commands to generate.