timer thread in progress
[deliverable/lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
CommitLineData
6e0be6cc
JD
1/*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <lttng/trigger/trigger.h>
20#include <common/error.h>
21#include <common/config/session-config.h>
22#include <common/defaults.h>
23#include <common/utils.h>
24#include <common/futex.h>
25#include <common/align.h>
26#include <common/time.h>
27#include <common/hashtable/utils.h>
28#include <sys/eventfd.h>
29#include <sys/stat.h>
30#include <time.h>
31#include <signal.h>
32#include <inttypes.h>
33
34#include <common/kernel-ctl/kernel-ctl.h>
35#include "rotation-thread.h"
36#include "lttng-sessiond.h"
37#include "health-sessiond.h"
e8ed1ed1 38#include "rotate.h"
6e0be6cc
JD
39#include "cmd.h"
40
41#include <urcu.h>
42#include <urcu/list.h>
43#include <urcu/rculfhash.h>
44
45struct cds_lfht *channel_pending_rotate_ht;
46
6e0be6cc
JD
47static
48void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
49{
50 assert(channel_info);
51 free(channel_info);
52}
53
6e0be6cc
JD
54static
55int match_channel_info(struct cds_lfht_node *node, const void *key)
56{
57 struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
58 struct rotation_channel_info *channel_info;
59
60 channel_info = caa_container_of(node, struct rotation_channel_info,
61 rotate_channels_ht_node);
62
63 return !!((channel_key->key == channel_info->channel_key.key) &&
64 (channel_key->domain == channel_info->channel_key.domain));
65}
66
67static
68struct rotation_channel_info *lookup_channel_pending(uint64_t key,
69 enum lttng_domain_type domain)
70{
71 struct cds_lfht_iter iter;
72 struct cds_lfht_node *node;
73 struct rotation_channel_info *channel_info = NULL;
74 struct rotation_channel_key channel_key = { .key = key,
75 .domain = domain };
76
77 cds_lfht_lookup(channel_pending_rotate_ht,
78 hash_channel_key(&channel_key),
79 match_channel_info,
80 &channel_key, &iter);
81 node = cds_lfht_iter_get_node(&iter);
82 if (!node) {
83 goto end;
84 }
85
86 channel_info = caa_container_of(node, struct rotation_channel_info,
87 rotate_channels_ht_node);
88 cds_lfht_del(channel_pending_rotate_ht, node);
89end:
90 return channel_info;
91}
92
93/*
94 * Destroy the thread data previously created by the init function.
95 */
96void rotation_thread_handle_destroy(
97 struct rotation_thread_handle *handle)
98{
99 int ret;
100
101 if (!handle) {
102 goto end;
103 }
104
105 if (handle->ust32_consumer >= 0) {
106 ret = close(handle->ust32_consumer);
107 if (ret) {
108 PERROR("close 32-bit consumer channel rotation pipe");
109 }
110 }
111 if (handle->ust64_consumer >= 0) {
112 ret = close(handle->ust64_consumer);
113 if (ret) {
114 PERROR("close 64-bit consumer channel rotation pipe");
115 }
116 }
117 if (handle->kernel_consumer >= 0) {
118 ret = close(handle->kernel_consumer);
119 if (ret) {
120 PERROR("close kernel consumer channel rotation pipe");
121 }
122 }
123
124end:
125 free(handle);
126}
127
128struct rotation_thread_handle *rotation_thread_handle_create(
129 struct lttng_pipe *ust32_channel_rotate_pipe,
130 struct lttng_pipe *ust64_channel_rotate_pipe,
131 struct lttng_pipe *kernel_channel_rotate_pipe,
55c2a7f9 132 int thread_quit_pipe, int rotate_timer_pipe)
6e0be6cc
JD
133{
134 struct rotation_thread_handle *handle;
135
136 handle = zmalloc(sizeof(*handle));
137 if (!handle) {
138 goto end;
139 }
140
141 if (ust32_channel_rotate_pipe) {
142 handle->ust32_consumer =
143 lttng_pipe_release_readfd(
144 ust32_channel_rotate_pipe);
145 if (handle->ust32_consumer < 0) {
146 goto error;
147 }
148 } else {
149 handle->ust32_consumer = -1;
150 }
151 if (ust64_channel_rotate_pipe) {
152 handle->ust64_consumer =
153 lttng_pipe_release_readfd(
154 ust64_channel_rotate_pipe);
155 if (handle->ust64_consumer < 0) {
156 goto error;
157 }
158 } else {
159 handle->ust64_consumer = -1;
160 }
161 if (kernel_channel_rotate_pipe) {
162 handle->kernel_consumer =
163 lttng_pipe_release_readfd(
164 kernel_channel_rotate_pipe);
165 if (handle->kernel_consumer < 0) {
166 goto error;
167 }
168 } else {
169 handle->kernel_consumer = -1;
170 }
171 handle->thread_quit_pipe = thread_quit_pipe;
55c2a7f9 172 handle->rotate_timer_pipe = rotate_timer_pipe;
6e0be6cc
JD
173
174end:
175 return handle;
176error:
177 rotation_thread_handle_destroy(handle);
178 return NULL;
179}
180
181static
182int init_poll_set(struct lttng_poll_event *poll_set,
183 struct rotation_thread_handle *handle)
184{
185 int ret;
186
187 /*
55c2a7f9 188 * Create pollset with size 5:
6e0be6cc
JD
189 * - sessiond quit pipe
190 * - consumerd (32-bit user space) channel rotate pipe,
191 * - consumerd (64-bit user space) channel rotate pipe,
192 * - consumerd (kernel) channel rotate pipe.
55c2a7f9 193 * - sessiond rotate pending pipe
6e0be6cc 194 */
55c2a7f9 195 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
6e0be6cc
JD
196 if (ret < 0) {
197 goto end;
198 }
199
200 ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
201 LPOLLIN | LPOLLERR);
202 if (ret < 0) {
203 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
204 goto error;
205 }
55c2a7f9
JD
206 ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
207 LPOLLIN | LPOLLERR);
208 if (ret < 0) {
209 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
210 goto error;
211 }
6e0be6cc
JD
212 ret = lttng_poll_add(poll_set, handle->ust32_consumer,
213 LPOLLIN | LPOLLERR);
214 if (ret < 0) {
215 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
216 goto error;
217 }
218 ret = lttng_poll_add(poll_set, handle->ust64_consumer,
219 LPOLLIN | LPOLLERR);
220 if (ret < 0) {
221 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
222 goto error;
223 }
224 if (handle->kernel_consumer < 0) {
225 goto end;
226 }
227 ret = lttng_poll_add(poll_set, handle->kernel_consumer,
228 LPOLLIN | LPOLLERR);
229 if (ret < 0) {
230 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
231 goto error;
232 }
233
234end:
235 return ret;
236error:
237 lttng_poll_clean(poll_set);
238 return ret;
239}
240
241static
242void fini_thread_state(struct rotation_thread_state *state)
243{
244 lttng_poll_clean(&state->events);
245}
246
247static
248int init_thread_state(struct rotation_thread_handle *handle,
249 struct rotation_thread_state *state)
250{
251 int ret;
252
253 memset(state, 0, sizeof(*state));
254 lttng_poll_init(&state->events);
255
256 ret = init_poll_set(&state->events, handle);
257 if (ret) {
258 goto end;
259 }
260
261 channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
262 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
263 if (!channel_pending_rotate_ht) {
264 ret = -1;
265 }
266
267end:
268 return 0;
269}
270
6e0be6cc
JD
271static
272int handle_channel_rotation_pipe(int fd, uint32_t revents,
273 struct rotation_thread_handle *handle,
274 struct rotation_thread_state *state)
275{
276 int ret = 0;
277 enum lttng_domain_type domain;
278 struct rotation_channel_info *channel_info;
279 uint64_t key;
280
281 if (fd == handle->ust32_consumer ||
282 fd == handle->ust64_consumer) {
283 domain = LTTNG_DOMAIN_UST;
284 } else if (fd == handle->kernel_consumer) {
285 domain = LTTNG_DOMAIN_KERNEL;
286 } else {
287 abort();
288 }
289
290 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
291 ret = lttng_poll_del(&state->events, fd);
292 if (ret) {
103373d7
JD
293 ERR("[rotation-thread] Failed to remove consumer "
294 "rotation pipe from poll set");
6e0be6cc
JD
295 }
296 goto end;
297 }
298
299 do {
300 ret = read(fd, &key, sizeof(key));
301 } while (ret == -1 && errno == EINTR);
302 if (ret != sizeof(key)) {
303 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
304 fd);
305 ret = -1;
306 goto end;
307 }
308
309 DBG("[rotation-thread] Received notification for chan %" PRIu64
310 ", domain %d\n", key, domain);
311
312 channel_info = lookup_channel_pending(key, domain);
313 if (!channel_info) {
314 ERR("[rotation-thread] Failed to find channel_info (key = %"
315 PRIu64 ")", key);
316 ret = -1;
317 goto end;
318 }
319
320 if (--channel_info->session->nr_chan_rotate_pending == 0) {
321 time_t now = time(NULL);
322
323 if (now == (time_t) -1) {
4a478f45 324 channel_info->session->rotate_status = LTTNG_ROTATE_ERROR;
6e0be6cc
JD
325 ret = LTTNG_ERR_ROTATE_NOT_AVAILABLE;
326 goto end;
327 }
328
329 ret = rename_complete_chunk(channel_info->session, now);
330 if (ret < 0) {
331 ERR("Failed to rename completed rotation chunk");
332 goto end;
333 }
d3dedf27 334 channel_info->session->rotate_pending = false;
6e0be6cc
JD
335 }
336
337 channel_rotation_info_destroy(channel_info);
338
339 ret = 0;
340
341end:
342 return ret;
343}
344
55c2a7f9
JD
345static
346int handle_rotate_timer_pipe(int fd, uint32_t revents,
347 struct rotation_thread_handle *handle,
348 struct rotation_thread_state *state)
349{
350 int ret = 0;
351 uint64_t session_id;
352 struct ltt_session *session;
353
354 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
355 ret = lttng_poll_del(&state->events, fd);
356 if (ret) {
357 ERR("[rotation-thread] Failed to remove consumer "
358 "rotate pending pipe from poll set");
359 }
360 goto end;
361 }
362
363 do {
364 ret = read(fd, &session_id, sizeof(session_id));
365 } while (ret == -1 && errno == EINTR);
366 if (ret != sizeof(session_id)) {
367 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
368 fd);
369 ret = -1;
370 goto end;
371 }
372
373 session = session_find_by_id(session_id);
374 if (!session) {
375 ERR("[rotation-thread] Session %" PRIu64 " not found",
376 session_id);
377 ret = -1;
378 goto end;
379 }
380
381 DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
382 session_id);
383 ret = relay_rotate_pending(session, session->rotate_count - 1);
384 if (ret < 0) {
385 ERR("[rotation-thread] Check relay rotate pending");
386 goto end;
387 }
388 if (ret == 0) {
389 DBG("[rotation-thread] Rotation completed on the relay for "
390 "session %" PRIu64, session_id);
391 /* TODO: delete timer */
392 } else if (ret == 1) {
393 DBG("[rotation-thread] Rotation still pending on the relay for "
394 "session %" PRIu64, session_id);
395 }
396 fprintf(stderr, "RET PENDING: %d\n", ret);
397
398 ret = 0;
399
400end:
401 return ret;
402}
403
6e0be6cc
JD
404void *thread_rotation(void *data)
405{
406 int ret;
407 struct rotation_thread_handle *handle = data;
408 struct rotation_thread_state state;
409
410 DBG("[rotation-thread] Started rotation thread");
411
412 if (!handle) {
413 ERR("[rotation-thread] Invalid thread context provided");
414 goto end;
415 }
416
417 rcu_register_thread();
418 rcu_thread_online();
419
420 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
421 health_code_update();
422
423 ret = init_thread_state(handle, &state);
424 if (ret) {
425 goto end;
426 }
427
428 /* Ready to handle client connections. */
429 sessiond_notify_ready();
430
431 while (true) {
432 int fd_count, i;
433
434 health_poll_entry();
435 DBG("[rotation-thread] Entering poll wait");
436 ret = lttng_poll_wait(&state.events, -1);
437 DBG("[rotation-thread] Poll wait returned (%i)", ret);
438 health_poll_exit();
439 if (ret < 0) {
440 /*
441 * Restart interrupted system call.
442 */
443 if (errno == EINTR) {
444 continue;
445 }
446 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
447 goto error;
448 }
449
450 fd_count = ret;
451 for (i = 0; i < fd_count; i++) {
452 int fd = LTTNG_POLL_GETFD(&state.events, i);
453 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
454
455 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
456 fd, revents);
457
458 if (fd == handle->thread_quit_pipe) {
459 DBG("[rotation-thread] Quit pipe activity");
460 goto exit;
55c2a7f9
JD
461 } else if (fd == handle->rotate_timer_pipe) {
462 ret = handle_rotate_timer_pipe(fd, revents,
463 handle, &state);
464 if (ret) {
465 ERR("[rotation-thread] Rotate pending");
466 goto error;
467 }
6e0be6cc
JD
468 } else if (fd == handle->ust32_consumer ||
469 fd == handle->ust64_consumer ||
470 fd == handle->kernel_consumer) {
471 ret = handle_channel_rotation_pipe(fd,
472 revents, handle, &state);
473 if (ret) {
474 ERR("[rotation-thread] Exit main loop");
475 goto error;
476 }
477 }
478 }
479 }
480exit:
481error:
482 fini_thread_state(&state);
483 health_unregister(health_sessiond);
484 rcu_thread_offline();
485 rcu_unregister_thread();
486end:
487 return NULL;
488}
This page took 0.042196 seconds and 5 git commands to generate.