SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.c
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.h"
9 #include "cmd.h"
10 #include "health-sessiond.h"
11 #include "lttng-sessiond.h"
12 #include "notification-thread-internal.h"
13 #include "session.h"
14 #include "thread.h"
15 #include <common/macros.h>
16 #include <lttng/action/group.h>
17 #include <lttng/action/notify.h>
18 #include <lttng/action/rotate-session.h>
19 #include <lttng/action/snapshot-session.h>
20 #include <lttng/action/start-session.h>
21 #include <lttng/action/stop-session.h>
22 #include <lttng/condition/evaluation.h>
23 #include <lttng/condition/event-rule-internal.h>
24 #include <lttng/lttng-error.h>
25 #include <lttng/trigger/trigger-internal.h>
26 #include <pthread.h>
27 #include <stdbool.h>
28 #include <stddef.h>
29 #include <urcu/list.h>
30
31 #define THREAD_NAME "Action Executor"
32 #define MAX_QUEUED_WORK_COUNT 8192
33
34 struct action_work_item {
35 uint64_t id;
36 struct lttng_trigger *trigger;
37 struct notification_client_list *client_list;
38 struct cds_list_head list_node;
39 };
40
41 struct action_executor {
42 struct lttng_thread *thread;
43 struct notification_thread_handle *notification_thread_handle;
44 struct {
45 uint64_t pending_count;
46 struct cds_list_head list;
47 pthread_cond_t cond;
48 pthread_mutex_t lock;
49 } work;
50 bool should_quit;
51 uint64_t next_work_item_id;
52 };
53
54 typedef int (*action_executor_handler)(struct action_executor *executor,
55 const struct action_work_item *,
56 const struct lttng_action *action);
57
58 static int action_executor_notify_handler(struct action_executor *executor,
59 const struct action_work_item *,
60 const struct lttng_action *);
61 static int action_executor_start_session_handler(struct action_executor *executor,
62 const struct action_work_item *,
63 const struct lttng_action *);
64 static int action_executor_stop_session_handler(struct action_executor *executor,
65 const struct action_work_item *,
66 const struct lttng_action *);
67 static int action_executor_rotate_session_handler(struct action_executor *executor,
68 const struct action_work_item *,
69 const struct lttng_action *);
70 static int action_executor_snapshot_session_handler(struct action_executor *executor,
71 const struct action_work_item *,
72 const struct lttng_action *);
73 static int action_executor_group_handler(struct action_executor *executor,
74 const struct action_work_item *,
75 const struct lttng_action *);
76 static int action_executor_generic_handler(struct action_executor *executor,
77 const struct action_work_item *,
78 const struct lttng_action *);
79
80 static const action_executor_handler action_executors[] = {
81 [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
82 [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
83 [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
84 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
85 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
86 [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
87 };
88
89 static const char *get_action_name(const struct lttng_action *action)
90 {
91 const char *action_type_names[] = {
92 [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
93 [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
94 [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
95 [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
96 [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
97 [LTTNG_ACTION_TYPE_GROUP] = "Group",
98 };
99
100 return action_type_names[lttng_action_get_type(action)];
101 }
102
103 static const char *get_trigger_name(const struct lttng_trigger *trigger)
104 {
105 const char *trigger_name;
106 enum lttng_trigger_status trigger_status;
107
108 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
109 assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
110
111 return trigger_name;
112 }
113
114 static int client_handle_transmission_status(
115 struct notification_client *client,
116 enum client_transmission_status status,
117 void *user_data)
118 {
119 struct action_executor *executor = user_data;
120
121 switch (status) {
122 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
123 DBG("Sent notification to client");
124 break;
125 default:
126 ERR("Could not send notification to client");
127 }
128
129 return 0;
130 }
131
132 static int action_executor_notify_handler(struct action_executor *executor,
133 const struct action_work_item *work_item,
134 const struct lttng_action *action)
135 {
136 int ret = 0;
137 struct lttng_evaluation *evaluation = NULL;
138
139 assert(work_item->client_list);
140
141 evaluation = lttng_evaluation_event_rule_create(
142 get_trigger_name(work_item->trigger));
143 if (!evaluation) {
144 ERR("Failed to create event rule hit evaluation");
145 ret = -1;
146 goto end;
147 }
148
149 ret = notification_client_list_send_evaluation(work_item->client_list,
150 lttng_trigger_get_const_condition(work_item->trigger),
151 evaluation,
152 lttng_trigger_get_credentials(work_item->trigger), NULL,
153 client_handle_transmission_status, executor);
154 end:
155 lttng_evaluation_destroy(evaluation);
156 return ret;
157 }
158
159 static int action_executor_start_session_handler(struct action_executor *executor,
160 const struct action_work_item *work_item,
161 const struct lttng_action *action)
162 {
163 int ret = 0;
164 const char *session_name;
165 enum lttng_action_status action_status;
166 struct ltt_session *session;
167
168 action_status = lttng_action_start_session_get_session_name(
169 action, &session_name);
170 if (action_status != LTTNG_ACTION_STATUS_OK) {
171 ERR("Failed to get session name from \"%s\" action",
172 get_action_name(action));
173 ret = -1;
174 goto end;
175 }
176
177 session_lock_list();
178 session = session_find_by_name(session_name);
179 if (session) {
180 enum lttng_error_code cmd_ret;
181
182 session_lock(session);
183 cmd_ret = cmd_start_trace(session);
184 session_unlock(session);
185
186 switch (cmd_ret) {
187 case LTTNG_OK:
188 DBG("Successfully started session \"%s\" on behalf of trigger \"%s\"",
189 session_name,
190 get_trigger_name(work_item->trigger));
191 break;
192 case LTTNG_ERR_TRACE_ALREADY_STARTED:
193 DBG("Attempted to start session \"%s\" on behalf of trigger \"%s\" but it was already started",
194 session_name,
195 get_trigger_name(work_item->trigger));
196 break;
197 default:
198 WARN("Failed to start session \"%s\" on behalf of trigger \"%s\": %s",
199 session_name,
200 get_trigger_name(work_item->trigger),
201 lttng_strerror(-cmd_ret));
202 break;
203 }
204 session_put(session);
205 } else {
206 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
207 session_name, get_action_name(action),
208 get_trigger_name(work_item->trigger));
209 }
210 session_unlock_list();
211 end:
212 return ret;
213 }
214
215 static int action_executor_stop_session_handler(struct action_executor *executor,
216 const struct action_work_item *work_item,
217 const struct lttng_action *action)
218 {
219 int ret = 0;
220 const char *session_name;
221 enum lttng_action_status action_status;
222 struct ltt_session *session;
223
224 action_status = lttng_action_stop_session_get_session_name(
225 action, &session_name);
226 if (action_status != LTTNG_ACTION_STATUS_OK) {
227 ERR("Failed to get session name from \"%s\" action",
228 get_action_name(action));
229 ret = -1;
230 goto end;
231 }
232
233 session_lock_list();
234 session = session_find_by_name(session_name);
235 if (session) {
236 enum lttng_error_code cmd_ret;
237
238 session_lock(session);
239 cmd_ret = cmd_stop_trace(session);
240 session_unlock(session);
241
242 switch (cmd_ret) {
243 case LTTNG_OK:
244 DBG("Successfully stopped session \"%s\" on behalf of trigger \"%s\"",
245 session_name,
246 get_trigger_name(work_item->trigger));
247 break;
248 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
249 DBG("Attempted to stop session \"%s\" on behalf of trigger \"%s\" but it was already stopped",
250 session_name,
251 get_trigger_name(work_item->trigger));
252 break;
253 default:
254 WARN("Failed to stop session \"%s\" on behalf of trigger \"%s\": %s",
255 session_name,
256 get_trigger_name(work_item->trigger),
257 lttng_strerror(-cmd_ret));
258 break;
259 }
260 session_put(session);
261 } else {
262 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
263 session_name, get_action_name(action),
264 get_trigger_name(work_item->trigger));
265 }
266 session_unlock_list();
267 end:
268 return ret;
269 }
270
271 static int action_executor_rotate_session_handler(struct action_executor *executor,
272 const struct action_work_item *work_item,
273 const struct lttng_action *action)
274 {
275 int ret = 0;
276 const char *session_name;
277 enum lttng_action_status action_status;
278 struct ltt_session *session;
279
280 action_status = lttng_action_rotate_session_get_session_name(
281 action, &session_name);
282 if (action_status != LTTNG_ACTION_STATUS_OK) {
283 ERR("Failed to get session name from \"%s\" action",
284 get_action_name(action));
285 ret = -1;
286 goto end;
287 }
288
289 session_lock_list();
290 session = session_find_by_name(session_name);
291 if (session) {
292 enum lttng_error_code cmd_ret;
293
294 session_lock(session);
295 cmd_ret = cmd_rotate_session(session, NULL, false,
296 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
297 session_unlock(session);
298
299 switch (cmd_ret) {
300 case LTTNG_OK:
301 DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%s\"",
302 session_name,
303 get_trigger_name(work_item->trigger));
304 break;
305 case LTTNG_ERR_ROTATION_PENDING:
306 DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation is already ongoing",
307 session_name,
308 get_trigger_name(work_item->trigger));
309 break;
310 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
311 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
312 DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%s\" but a rotation has already been completed since the last stop or clear",
313 session_name,
314 get_trigger_name(work_item->trigger));
315 break;
316 default:
317 WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%s\": %s",
318 session_name,
319 get_trigger_name(work_item->trigger),
320 lttng_strerror(-cmd_ret));
321 break;
322 }
323 session_put(session);
324 } else {
325 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
326 session_name, get_action_name(action),
327 get_trigger_name(work_item->trigger));
328 }
329 session_unlock_list();
330 end:
331 return ret;
332 }
333
334 static int action_executor_snapshot_session_handler(struct action_executor *executor,
335 const struct action_work_item *work_item,
336 const struct lttng_action *action)
337 {
338 int ret = 0;
339 const char *session_name;
340 enum lttng_action_status action_status;
341 struct ltt_session *session;
342 const struct lttng_snapshot_output default_snapshot_output = {
343 .max_size = UINT64_MAX,
344 };
345 const struct lttng_snapshot_output *snapshot_output =
346 &default_snapshot_output;
347
348 action_status = lttng_action_snapshot_session_get_session_name(
349 action, &session_name);
350 if (action_status != LTTNG_ACTION_STATUS_OK) {
351 ERR("Failed to get session name from \"%s\" action",
352 get_action_name(action));
353 ret = -1;
354 goto end;
355 }
356
357 action_status = lttng_action_snapshot_session_get_output_const(
358 action, &snapshot_output);
359 if (action_status != LTTNG_ACTION_STATUS_OK &&
360 action_status != LTTNG_ACTION_STATUS_UNSET) {
361 ERR("Failed to get output from \"%s\" action",
362 get_action_name(action));
363 ret = -1;
364 goto end;
365 }
366
367 session_lock_list();
368 session = session_find_by_name(session_name);
369 if (session) {
370 enum lttng_error_code cmd_ret;
371
372 session_lock(session);
373 cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
374 session_unlock(session);
375
376 switch (cmd_ret) {
377 case LTTNG_OK:
378 DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%s\"",
379 session_name,
380 get_trigger_name(work_item->trigger));
381 break;
382 default:
383 WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%s\": %s",
384 session_name,
385 get_trigger_name(work_item->trigger),
386 lttng_strerror(-cmd_ret));
387 break;
388 }
389 session_put(session);
390 } else {
391 DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%s\"",
392 session_name, get_action_name(action),
393 get_trigger_name(work_item->trigger));
394 }
395 session_unlock_list();
396 end:
397 return ret;
398 }
399
400 static int action_executor_group_handler(struct action_executor *executor,
401 const struct action_work_item *work_item,
402 const struct lttng_action *action_group)
403 {
404 int ret = 0;
405 unsigned int i, count;
406 enum lttng_action_status action_status;
407
408 action_status = lttng_action_group_get_count(action_group, &count);
409 if (action_status != LTTNG_ACTION_STATUS_OK) {
410 /* Fatal error. */
411 ERR("Failed to get count of action in action group");
412 ret = -1;
413 goto end;
414 }
415
416 DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
417 for (i = 0; i < count; i++) {
418 const struct lttng_action *action =
419 lttng_action_group_get_at_index_const(
420 action_group, i);
421
422 ret = action_executor_generic_handler(
423 executor, work_item, action);
424 if (ret) {
425 ERR("Stopping the execution of the action group of trigger \"%s\" following a fatal error",
426 get_trigger_name(work_item->trigger));
427 goto end;
428 }
429 }
430 end:
431 return ret;
432 }
433
434 static int action_executor_generic_handler(struct action_executor *executor,
435 const struct action_work_item *work_item,
436 const struct lttng_action *action)
437 {
438 DBG("Executing action \"%s\" of trigger \"%s\" action work item %" PRIu64,
439 get_action_name(action),
440 get_trigger_name(work_item->trigger),
441 work_item->id);
442
443 return action_executors[lttng_action_get_type(action)](
444 executor, work_item, action);
445 }
446
447 static int action_work_item_execute(struct action_executor *executor,
448 struct action_work_item *work_item)
449 {
450 int ret;
451 const struct lttng_action *action =
452 lttng_trigger_get_const_action(work_item->trigger);
453
454 DBG("Starting execution of action work item %" PRIu64 " of trigger \"%s\"",
455 work_item->id, get_trigger_name(work_item->trigger));
456 ret = action_executor_generic_handler(executor, work_item, action);
457 DBG("Completed execution of action work item %" PRIu64 " of trigger \"%s\"",
458 work_item->id, get_trigger_name(work_item->trigger));
459 return ret;
460 }
461
462 static void action_work_item_destroy(struct action_work_item *work_item)
463 {
464 lttng_trigger_put(work_item->trigger);
465 notification_client_list_put(work_item->client_list);
466 free(work_item);
467 }
468
469 static void *action_executor_thread(void *_data)
470 {
471 struct action_executor *executor = _data;
472
473 assert(executor);
474
475 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
476
477 rcu_register_thread();
478 rcu_thread_online();
479
480 DBG("Entering work execution loop");
481 pthread_mutex_lock(&executor->work.lock);
482 while (!executor->should_quit) {
483 int ret;
484 struct action_work_item *work_item;
485
486 health_code_update();
487 if (executor->work.pending_count == 0) {
488 health_poll_entry();
489 DBG("No work items enqueued, entering wait");
490 pthread_cond_wait(&executor->work.cond,
491 &executor->work.lock);
492 DBG("Woke-up from wait");
493 health_poll_exit();
494 continue;
495 }
496
497 /* Pop item from front of the listwith work lock held. */
498 work_item = cds_list_first_entry(&executor->work.list,
499 struct action_work_item, list_node);
500 cds_list_del(&work_item->list_node);
501 executor->work.pending_count--;
502
503 /*
504 * Work can be performed without holding the work lock,
505 * allowing new items to be queued.
506 */
507 pthread_mutex_unlock(&executor->work.lock);
508 ret = action_work_item_execute(executor, work_item);
509 action_work_item_destroy(work_item);
510 if (ret) {
511 /* Fatal error. */
512 break;
513 }
514 health_code_update();
515 pthread_mutex_lock(&executor->work.lock);
516 }
517 pthread_mutex_unlock(&executor->work.lock);
518 DBG("Left work execution loop");
519
520 health_code_update();
521
522 rcu_thread_offline();
523 rcu_unregister_thread();
524 health_unregister(health_sessiond);
525
526 return NULL;
527 }
528
529 static bool shutdown_action_executor_thread(void *_data)
530 {
531 struct action_executor *executor = _data;
532
533 /* TODO. */
534 executor->should_quit = true;
535 pthread_cond_signal(&executor->work.cond);
536 return true;
537 }
538
539 static void clean_up_action_executor_thread(void *_data)
540 {
541 struct action_executor *executor = _data;
542
543 assert(cds_list_empty(&executor->work.list));
544
545 pthread_mutex_destroy(&executor->work.lock);
546 pthread_cond_destroy(&executor->work.cond);
547 free(executor);
548 }
549
550 struct action_executor *action_executor_create(
551 struct notification_thread_handle *handle)
552 {
553 struct action_executor *executor = zmalloc(sizeof(*executor));
554
555 if (!executor) {
556 goto end;
557 }
558
559 CDS_INIT_LIST_HEAD(&executor->work.list);
560 pthread_cond_init(&executor->work.cond, NULL);
561 pthread_mutex_init(&executor->work.lock, NULL);
562 executor->notification_thread_handle = handle;
563
564 executor->thread = lttng_thread_create(THREAD_NAME,
565 action_executor_thread, shutdown_action_executor_thread,
566 clean_up_action_executor_thread, executor);
567 end:
568 return executor;
569 }
570
571 void action_executor_destroy(struct action_executor *executor)
572 {
573 struct action_work_item *work_item, *tmp;
574
575 /* TODO Wait for work list to drain? */
576 lttng_thread_shutdown(executor->thread);
577 pthread_mutex_lock(&executor->work.lock);
578 if (executor->work.pending_count != 0) {
579 WARN("%" PRIu64
580 " trigger action%s still queued for execution and will be discarded",
581 executor->work.pending_count,
582 executor->work.pending_count == 1 ? " is" :
583 "s are");
584 }
585
586 cds_list_for_each_entry_safe (
587 work_item, tmp, &executor->work.list, list_node) {
588 WARN("Discarding action work item %" PRIu64
589 " associated to trigger \"%s\"",
590 work_item->id, get_trigger_name(work_item->trigger));
591 cds_list_del(&work_item->list_node);
592 action_work_item_destroy(work_item);
593 }
594 pthread_mutex_unlock(&executor->work.lock);
595 lttng_thread_put(executor->thread);
596 }
597
598 /* RCU read-lock must be held by the caller. */
599 enum action_executor_status action_executor_enqueue(
600 struct action_executor *executor,
601 struct lttng_trigger *trigger,
602 struct notification_client_list *client_list)
603 {
604 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
605 const uint64_t work_item_id = executor->next_work_item_id++;
606 struct action_work_item *work_item;
607 bool signal = false;
608
609 pthread_mutex_lock(&executor->work.lock);
610 /* Check for queue overflow. */
611 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
612 /* Most likely spammy, remove if it is the case. */
613 DBG("Refusing to enqueue action for trigger \"%s\" as work item %" PRIu64
614 " (overflow)",
615 get_trigger_name(trigger), work_item_id);
616 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
617 goto error_unlock;
618 }
619
620 work_item = zmalloc(sizeof(*work_item));
621 if (!work_item) {
622 PERROR("Failed to allocate action executor work item on behalf of trigger \"%s\"",
623 get_trigger_name(trigger));
624 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
625 goto error_unlock;
626 }
627
628 lttng_trigger_get(trigger);
629 if (client_list) {
630 const bool reference_acquired =
631 notification_client_list_get(client_list);
632
633 assert(reference_acquired);
634 }
635
636 *work_item = (typeof(*work_item)){
637 .id = work_item_id,
638 .trigger = trigger,
639 .client_list = client_list,
640 .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
641 };
642 cds_list_add_tail(&work_item->list_node, &executor->work.list);
643 executor->work.pending_count++;
644 DBG("Enqueued action for trigger \"%s\" as work item %" PRIu64,
645 get_trigger_name(trigger), work_item_id);
646 signal = true;
647
648 error_unlock:
649 pthread_mutex_unlock(&executor->work.lock);
650 if (signal) {
651 pthread_cond_signal(&executor->work.cond);
652 }
653 return executor_status;
654 }
This page took 0.042557 seconds and 5 git commands to generate.