fix
[deliverable/lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
CommitLineData
6e0be6cc
JD
1/*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <lttng/trigger/trigger.h>
20#include <common/error.h>
21#include <common/config/session-config.h>
22#include <common/defaults.h>
23#include <common/utils.h>
24#include <common/futex.h>
25#include <common/align.h>
26#include <common/time.h>
27#include <common/hashtable/utils.h>
28#include <sys/eventfd.h>
29#include <sys/stat.h>
30#include <time.h>
31#include <signal.h>
32#include <inttypes.h>
33
34#include <common/kernel-ctl/kernel-ctl.h>
35#include "rotation-thread.h"
36#include "lttng-sessiond.h"
37#include "health-sessiond.h"
38#include "cmd.h"
39
40#include <urcu.h>
41#include <urcu/list.h>
42#include <urcu/rculfhash.h>
43
44struct cds_lfht *channel_pending_rotate_ht;
45
46static
47unsigned long hash_channel_key(struct rotation_channel_key *key)
48{
49 return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
50 (void *) (unsigned long) key->domain, lttng_ht_seed);
51}
52
53static
54void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
55{
56 assert(channel_info);
57 free(channel_info);
58}
59
60int rotate_add_channel_pending(uint64_t key, enum lttng_domain_type domain,
61 struct ltt_session *session)
62{
63 int ret;
64 struct rotation_channel_info *new_info;
65 struct rotation_channel_key channel_key = { .key = key,
66 .domain = domain };
67
68 new_info = zmalloc(sizeof(struct rotation_channel_info));
69 if (!new_info) {
70 goto error;
71 }
72
73 new_info->channel_key.key = key;
74 new_info->channel_key.domain = domain;
75 new_info->session = session;
76 cds_lfht_node_init(&new_info->rotate_channels_ht_node);
77
78 session->nr_chan_rotate_pending++;
79 cds_lfht_add(channel_pending_rotate_ht,
80 hash_channel_key(&channel_key),
81 &new_info->rotate_channels_ht_node);
82
83 ret = 0;
84 goto end;
85
86error:
87 ret = -1;
88end:
89 return ret;
90}
91
92static
93int match_channel_info(struct cds_lfht_node *node, const void *key)
94{
95 struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
96 struct rotation_channel_info *channel_info;
97
98 channel_info = caa_container_of(node, struct rotation_channel_info,
99 rotate_channels_ht_node);
100
101 return !!((channel_key->key == channel_info->channel_key.key) &&
102 (channel_key->domain == channel_info->channel_key.domain));
103}
104
105static
106struct rotation_channel_info *lookup_channel_pending(uint64_t key,
107 enum lttng_domain_type domain)
108{
109 struct cds_lfht_iter iter;
110 struct cds_lfht_node *node;
111 struct rotation_channel_info *channel_info = NULL;
112 struct rotation_channel_key channel_key = { .key = key,
113 .domain = domain };
114
115 cds_lfht_lookup(channel_pending_rotate_ht,
116 hash_channel_key(&channel_key),
117 match_channel_info,
118 &channel_key, &iter);
119 node = cds_lfht_iter_get_node(&iter);
120 if (!node) {
121 goto end;
122 }
123
124 channel_info = caa_container_of(node, struct rotation_channel_info,
125 rotate_channels_ht_node);
126 cds_lfht_del(channel_pending_rotate_ht, node);
127end:
128 return channel_info;
129}
130
131/*
132 * Destroy the thread data previously created by the init function.
133 */
134void rotation_thread_handle_destroy(
135 struct rotation_thread_handle *handle)
136{
137 int ret;
138
139 if (!handle) {
140 goto end;
141 }
142
143 if (handle->ust32_consumer >= 0) {
144 ret = close(handle->ust32_consumer);
145 if (ret) {
146 PERROR("close 32-bit consumer channel rotation pipe");
147 }
148 }
149 if (handle->ust64_consumer >= 0) {
150 ret = close(handle->ust64_consumer);
151 if (ret) {
152 PERROR("close 64-bit consumer channel rotation pipe");
153 }
154 }
155 if (handle->kernel_consumer >= 0) {
156 ret = close(handle->kernel_consumer);
157 if (ret) {
158 PERROR("close kernel consumer channel rotation pipe");
159 }
160 }
161
162end:
163 free(handle);
164}
165
166struct rotation_thread_handle *rotation_thread_handle_create(
167 struct lttng_pipe *ust32_channel_rotate_pipe,
168 struct lttng_pipe *ust64_channel_rotate_pipe,
169 struct lttng_pipe *kernel_channel_rotate_pipe,
170 int thread_quit_pipe)
171{
172 struct rotation_thread_handle *handle;
173
174 handle = zmalloc(sizeof(*handle));
175 if (!handle) {
176 goto end;
177 }
178
179 if (ust32_channel_rotate_pipe) {
180 handle->ust32_consumer =
181 lttng_pipe_release_readfd(
182 ust32_channel_rotate_pipe);
183 if (handle->ust32_consumer < 0) {
184 goto error;
185 }
186 } else {
187 handle->ust32_consumer = -1;
188 }
189 if (ust64_channel_rotate_pipe) {
190 handle->ust64_consumer =
191 lttng_pipe_release_readfd(
192 ust64_channel_rotate_pipe);
193 if (handle->ust64_consumer < 0) {
194 goto error;
195 }
196 } else {
197 handle->ust64_consumer = -1;
198 }
199 if (kernel_channel_rotate_pipe) {
200 handle->kernel_consumer =
201 lttng_pipe_release_readfd(
202 kernel_channel_rotate_pipe);
203 if (handle->kernel_consumer < 0) {
204 goto error;
205 }
206 } else {
207 handle->kernel_consumer = -1;
208 }
209 handle->thread_quit_pipe = thread_quit_pipe;
210
211end:
212 return handle;
213error:
214 rotation_thread_handle_destroy(handle);
215 return NULL;
216}
217
218static
219int init_poll_set(struct lttng_poll_event *poll_set,
220 struct rotation_thread_handle *handle)
221{
222 int ret;
223
224 /*
225 * Create pollset with size 4:
226 * - sessiond quit pipe
227 * - consumerd (32-bit user space) channel rotate pipe,
228 * - consumerd (64-bit user space) channel rotate pipe,
229 * - consumerd (kernel) channel rotate pipe.
230 */
231 ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC);
232 if (ret < 0) {
233 goto end;
234 }
235
236 ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
237 LPOLLIN | LPOLLERR);
238 if (ret < 0) {
239 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
240 goto error;
241 }
242 ret = lttng_poll_add(poll_set, handle->ust32_consumer,
243 LPOLLIN | LPOLLERR);
244 if (ret < 0) {
245 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
246 goto error;
247 }
248 ret = lttng_poll_add(poll_set, handle->ust64_consumer,
249 LPOLLIN | LPOLLERR);
250 if (ret < 0) {
251 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
252 goto error;
253 }
254 if (handle->kernel_consumer < 0) {
255 goto end;
256 }
257 ret = lttng_poll_add(poll_set, handle->kernel_consumer,
258 LPOLLIN | LPOLLERR);
259 if (ret < 0) {
260 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
261 goto error;
262 }
263
264end:
265 return ret;
266error:
267 lttng_poll_clean(poll_set);
268 return ret;
269}
270
271static
272void fini_thread_state(struct rotation_thread_state *state)
273{
274 lttng_poll_clean(&state->events);
275}
276
277static
278int init_thread_state(struct rotation_thread_handle *handle,
279 struct rotation_thread_state *state)
280{
281 int ret;
282
283 memset(state, 0, sizeof(*state));
284 lttng_poll_init(&state->events);
285
286 ret = init_poll_set(&state->events, handle);
287 if (ret) {
288 goto end;
289 }
290
291 channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
292 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
293 if (!channel_pending_rotate_ht) {
294 ret = -1;
295 }
296
297end:
298 return 0;
299}
300
301int rename_complete_chunk(struct ltt_session *session, time_t ts)
302{
303 struct tm *timeinfo;
304 char datetime[16];
305 char *tmppath = NULL;
306 int ret;
307
308 timeinfo = localtime(&ts);
309 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
310
311 tmppath = zmalloc(PATH_MAX * sizeof(char));
312 if (!tmppath) {
313 ERR("Alloc tmppath");
314 ret = -1;
315 goto end;
316 }
317
318 snprintf(tmppath, PATH_MAX, "%s%s-%" PRIu64,
319 session->rotation_chunk.current_rotate_path,
320 datetime, session->rotate_count);
321
322 fprintf(stderr, "rename %s to %s\n", session->rotation_chunk.current_rotate_path,
323 tmppath);
324 ret = rename(session->rotation_chunk.current_rotate_path,
325 tmppath);
326 if (ret < 0) {
327 PERROR("Rename completed rotation chunk");
328 goto end;
329 }
330 /*
331 * Store the path where the readable chunk is. This path is valid
332 * and can be queried by the client with rotate_pending until the next
333 * rotation is started.
334 */
335 snprintf(session->rotation_chunk.current_rotate_path, PATH_MAX,
336 "%s", tmppath);
337 session->rotate_pending = 0;
338
339end:
340 free(tmppath);
341 return ret;
342}
343
344static
345int handle_channel_rotation_pipe(int fd, uint32_t revents,
346 struct rotation_thread_handle *handle,
347 struct rotation_thread_state *state)
348{
349 int ret = 0;
350 enum lttng_domain_type domain;
351 struct rotation_channel_info *channel_info;
352 uint64_t key;
353
354 if (fd == handle->ust32_consumer ||
355 fd == handle->ust64_consumer) {
356 domain = LTTNG_DOMAIN_UST;
357 } else if (fd == handle->kernel_consumer) {
358 domain = LTTNG_DOMAIN_KERNEL;
359 } else {
360 abort();
361 }
362
363 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
364 ret = lttng_poll_del(&state->events, fd);
365 if (ret) {
366 ERR("[rotation-thread] Failed to remove consumer rotation pipe from poll set");
367 }
368 goto end;
369 }
370
371 do {
372 ret = read(fd, &key, sizeof(key));
373 } while (ret == -1 && errno == EINTR);
374 if (ret != sizeof(key)) {
375 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
376 fd);
377 ret = -1;
378 goto end;
379 }
380
381 DBG("[rotation-thread] Received notification for chan %" PRIu64
382 ", domain %d\n", key, domain);
383
384 channel_info = lookup_channel_pending(key, domain);
385 if (!channel_info) {
386 ERR("[rotation-thread] Failed to find channel_info (key = %"
387 PRIu64 ")", key);
388 ret = -1;
389 goto end;
390 }
391
392 if (--channel_info->session->nr_chan_rotate_pending == 0) {
393 time_t now = time(NULL);
394
395 if (now == (time_t) -1) {
396 ret = LTTNG_ERR_ROTATE_NOT_AVAILABLE;
397 goto end;
398 }
399
400 ret = rename_complete_chunk(channel_info->session, now);
401 if (ret < 0) {
402 ERR("Failed to rename completed rotation chunk");
403 goto end;
404 }
405 }
406
407 channel_rotation_info_destroy(channel_info);
408
409 ret = 0;
410
411end:
412 return ret;
413}
414
415void *thread_rotation(void *data)
416{
417 int ret;
418 struct rotation_thread_handle *handle = data;
419 struct rotation_thread_state state;
420
421 DBG("[rotation-thread] Started rotation thread");
422
423 if (!handle) {
424 ERR("[rotation-thread] Invalid thread context provided");
425 goto end;
426 }
427
428 rcu_register_thread();
429 rcu_thread_online();
430
431 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
432 health_code_update();
433
434 ret = init_thread_state(handle, &state);
435 if (ret) {
436 goto end;
437 }
438
439 /* Ready to handle client connections. */
440 sessiond_notify_ready();
441
442 while (true) {
443 int fd_count, i;
444
445 health_poll_entry();
446 DBG("[rotation-thread] Entering poll wait");
447 ret = lttng_poll_wait(&state.events, -1);
448 DBG("[rotation-thread] Poll wait returned (%i)", ret);
449 health_poll_exit();
450 if (ret < 0) {
451 /*
452 * Restart interrupted system call.
453 */
454 if (errno == EINTR) {
455 continue;
456 }
457 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
458 goto error;
459 }
460
461 fd_count = ret;
462 for (i = 0; i < fd_count; i++) {
463 int fd = LTTNG_POLL_GETFD(&state.events, i);
464 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
465
466 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
467 fd, revents);
468
469 if (fd == handle->thread_quit_pipe) {
470 DBG("[rotation-thread] Quit pipe activity");
471 goto exit;
472 } else if (fd == handle->ust32_consumer ||
473 fd == handle->ust64_consumer ||
474 fd == handle->kernel_consumer) {
475 ret = handle_channel_rotation_pipe(fd,
476 revents, handle, &state);
477 if (ret) {
478 ERR("[rotation-thread] Exit main loop");
479 goto error;
480 }
481 }
482 }
483 }
484exit:
485error:
486 fini_thread_state(&state);
487 health_unregister(health_sessiond);
488 rcu_thread_offline();
489 rcu_unregister_thread();
490end:
491 return NULL;
492}
This page took 0.042697 seconds and 5 git commands to generate.