Merge streams in ctf fs component
[babeltrace.git] / plugins / ctf / fs / fs.c
CommitLineData
7a278c8e 1/*
ea0b4b9e 2 * fs.c
7a278c8e 3 *
ea0b4b9e 4 * Babeltrace CTF file system Reader Component
7a278c8e 5 *
f3bc2010 6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7a278c8e
JG
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
ea0b4b9e 29#include <babeltrace/plugin/plugin-system.h>
5b29e799 30#include <babeltrace/ctf-ir/packet.h>
760051fa 31#include <babeltrace/plugin/notification/iterator.h>
5b29e799
JG
32#include <babeltrace/plugin/notification/stream.h>
33#include <babeltrace/plugin/notification/event.h>
34#include <babeltrace/plugin/notification/packet.h>
35#include <babeltrace/plugin/notification/heap.h>
ea0b4b9e
JG
36#include <glib.h>
37#include <assert.h>
56a1cced
JG
38#include <unistd.h>
39#include "fs.h"
413bc2c4
JG
40#include "metadata.h"
41#include "data-stream.h"
e7a4393b
JG
42#include "file.h"
43
44#define PRINT_ERR_STREAM ctf_fs->error_fp
45#define PRINT_PREFIX "ctf-fs"
46#include "print.h"
ea0b4b9e
JG
47
48static bool ctf_fs_debug;
49
50static
760051fa
JG
51struct bt_notification *ctf_fs_iterator_get(
52 struct bt_notification_iterator *iterator)
ea0b4b9e 53{
5b29e799
JG
54 struct ctf_fs_iterator *ctf_it =
55 bt_notification_iterator_get_private_data(iterator);
d01e0f33 56
5b29e799 57 return bt_get(ctf_it->current_notification);
ea0b4b9e
JG
58}
59
60static
5b29e799
JG
61enum bt_notification_iterator_status ctf_fs_iterator_get_next_notification(
62 struct ctf_fs_iterator *it,
63 struct ctf_fs_stream *stream,
64 struct bt_notification **notification)
ea0b4b9e 65{
5b29e799 66 enum bt_ctf_notif_iter_status status;
d01e0f33 67 enum bt_notification_iterator_status ret;
d01e0f33 68
5b29e799
JG
69 if (stream->end_reached) {
70 status = BT_CTF_NOTIF_ITER_STATUS_EOF;
d01e0f33
JG
71 goto end;
72 }
73
5b29e799
JG
74 status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter,
75 notification);
76 if (status != BT_CTF_NOTIF_ITER_STATUS_OK &&
77 status != BT_CTF_NOTIF_ITER_STATUS_EOF) {
d01e0f33
JG
78 goto end;
79 }
80
5b29e799
JG
81 /* Should be handled in bt_ctf_notif_iter_get_next_notification. */
82 if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
83 *notification = bt_notification_stream_end_create(
84 stream->stream);
85 if (!*notification) {
86 status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
87 }
88 status = BT_CTF_NOTIF_ITER_STATUS_OK;
89 stream->end_reached = true;
90 }
d01e0f33 91end:
5b29e799
JG
92 switch (status) {
93 case BT_CTF_NOTIF_ITER_STATUS_EOF:
94 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
95 break;
96 case BT_CTF_NOTIF_ITER_STATUS_OK:
97 ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
98 break;
99 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
100 /*
101 * Should not make it this far as this is medium-specific;
102 * there is nothing for the user to do and it should have been
103 * handled upstream.
104 */
105 assert(0);
106 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
107 /* No argument provided by the user, so don't return INVAL. */
108 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
109 default:
110 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
111 break;
112 }
043e2020 113 return ret;
ea0b4b9e 114}
bfd20a42 115
5b29e799
JG
116/*
117 * Remove me. This is a temporary work-around due to our inhability to use
118 * libbabeltrace-ctf from libbabeltrace-plugin.
119 */
760051fa 120static
5b29e799
JG
121struct bt_ctf_stream *internal_bt_notification_get_stream(
122 struct bt_notification *notification)
760051fa 123{
5b29e799
JG
124 struct bt_ctf_stream *stream = NULL;
125
126 assert(notification);
127 switch (bt_notification_get_type(notification)) {
128 case BT_NOTIFICATION_TYPE_EVENT:
129 {
130 struct bt_ctf_event *event;
131
132 event = bt_notification_event_get_event(notification);
133 stream = bt_ctf_event_get_stream(event);
134 bt_put(event);
135 break;
136 }
137 case BT_NOTIFICATION_TYPE_PACKET_START:
138 {
139 struct bt_ctf_packet *packet;
140
141 packet = bt_notification_packet_start_get_packet(notification);
142 stream = bt_ctf_packet_get_stream(packet);
143 bt_put(packet);
144 break;
145 }
146 case BT_NOTIFICATION_TYPE_PACKET_END:
147 {
148 struct bt_ctf_packet *packet;
149
150 packet = bt_notification_packet_end_get_packet(notification);
151 stream = bt_ctf_packet_get_stream(packet);
152 bt_put(packet);
153 break;
154 }
155 case BT_NOTIFICATION_TYPE_STREAM_END:
156 stream = bt_notification_stream_end_get_stream(notification);
157 break;
158 default:
159 goto end;
160 }
161end:
162 return stream;
760051fa
JG
163}
164
165static
5b29e799 166enum bt_notification_iterator_status populate_heap(struct ctf_fs_iterator *it)
760051fa 167{
5b29e799
JG
168 size_t i, pending_streams_count = it->pending_streams->len;
169 enum bt_notification_iterator_status ret =
170 BT_NOTIFICATION_ITERATOR_STATUS_OK;
171
172 /* Insert one stream-associated notification for each stream. */
173 for (i = 0; i < pending_streams_count; i++) {
174 struct bt_notification *notification;
175 struct ctf_fs_stream *fs_stream;
176 struct bt_ctf_stream *stream;
177 size_t pending_stream_index = pending_streams_count - 1 - i;
178
179 fs_stream = g_ptr_array_index(it->pending_streams,
180 pending_stream_index);
181
182 do {
183 int heap_ret;
184
185 ret = ctf_fs_iterator_get_next_notification(
186 it, fs_stream, &notification);
187 if (ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END) {
188 printf_debug("Failed to populate heap at stream %zu\n",
189 pending_stream_index);
190 goto end;
191 }
192
193 stream = internal_bt_notification_get_stream(
194 notification);
195 if (stream) {
196 gboolean inserted;
197
198 /*
199 * Associate pending ctf_fs_stream to
200 * bt_ctf_stream. Ownership of stream
201 * is passed to the stream ht.
202 */
203 inserted = g_hash_table_insert(it->stream_ht,
204 stream, fs_stream);
205 if (!inserted) {
206 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
207 printf_debug("Failed to associate fs stream to ctf stream\n");
208 goto end;
209 }
210 }
211
212 heap_ret = bt_notification_heap_insert(
213 it->pending_notifications,
214 notification);
215 bt_put(notification);
216 if (heap_ret) {
217 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
218 printf_debug("Failed to insert notification in heap\n");
219 goto end;
220 }
221 } while (!stream && ret != BT_NOTIFICATION_ITERATOR_STATUS_END);
222 /*
223 * Set NULL so the destruction callback registered with the
224 * array is not invoked on the stream (its ownership was
225 * transferred to the streams hashtable).
226 */
227 g_ptr_array_index(it->pending_streams,
228 pending_stream_index) = NULL;
229 g_ptr_array_remove_index(it->pending_streams,
230 pending_stream_index);
231 }
760051fa 232
5b29e799
JG
233 g_ptr_array_free(it->pending_streams, TRUE);
234 it->pending_streams = NULL;
235end:
236 return ret;
760051fa
JG
237}
238
239static
5b29e799
JG
240enum bt_notification_iterator_status ctf_fs_iterator_next(
241 struct bt_notification_iterator *iterator)
4c1456f0 242{
5b29e799
JG
243 int heap_ret;
244 struct bt_ctf_stream *stream;
245 struct ctf_fs_stream *fs_stream;
246 struct bt_notification *notification;
247 struct bt_notification *next_stream_notification;
248 enum bt_notification_iterator_status ret =
249 BT_NOTIFICATION_ITERATOR_STATUS_OK;
250 struct ctf_fs_iterator *ctf_it =
251 bt_notification_iterator_get_private_data(iterator);
252
253 notification = bt_notification_heap_pop(ctf_it->pending_notifications);
254 if (!notification && !ctf_it->pending_streams) {
255 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
760051fa
JG
256 goto end;
257 }
258
5b29e799
JG
259 if (!notification && ctf_it->pending_streams) {
260 /*
261 * Insert at one notification per stream in the heap and pop
262 * one.
263 */
264 ret = populate_heap(ctf_it);
265 if (ret) {
266 goto end;
267 }
268
269 notification = bt_notification_heap_pop(
270 ctf_it->pending_notifications);
271 if (!notification) {
272 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
273 goto end;
274 }
760051fa
JG
275 }
276
5b29e799
JG
277 /* notification is set from here. */
278
279 stream = internal_bt_notification_get_stream(notification);
280 if (!stream) {
281 /*
282 * The current notification is not associated to a particular
283 * stream, there is no need to insert a new notification from
284 * a stream in the heap.
285 */
286 goto end;
760051fa
JG
287 }
288
5b29e799
JG
289 fs_stream = g_hash_table_lookup(ctf_it->stream_ht, stream);
290 if (!fs_stream) {
291 /* We have reached this stream's end. */
292 goto end;
760051fa
JG
293 }
294
5b29e799
JG
295 ret = ctf_fs_iterator_get_next_notification(ctf_it, fs_stream,
296 &next_stream_notification);
297 if ((ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END)) {
298 heap_ret = bt_notification_heap_insert(
299 ctf_it->pending_notifications, notification);
300
301 assert(!next_stream_notification);
302 if (heap_ret) {
303 /*
304 * We're dropping the most recent notification, but at
305 * this point, something is seriously wrong...
306 */
307 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
308 }
309 BT_PUT(notification);
310 goto end;
311 }
312
313 if (ret == BT_NOTIFICATION_ITERATOR_STATUS_END) {
314 gboolean success;
315
316 /* Remove stream. */
317 success = g_hash_table_remove(ctf_it->stream_ht, stream);
318 assert(success);
319 ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
320 } else {
321 heap_ret = bt_notification_heap_insert(ctf_it->pending_notifications,
322 next_stream_notification);
323 BT_PUT(next_stream_notification);
324 if (heap_ret) {
325 /*
326 * We're dropping the most recent notification...
327 */
328 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
329 }
760051fa 330 }
5b29e799
JG
331
332 /*
333 * Ensure that the stream is removed from both pending_streams and
334 * the streams hashtable on reception of the "end of stream"
335 * notification.
336 */
760051fa 337end:
5b29e799 338 BT_MOVE(ctf_it->current_notification, notification);
760051fa
JG
339 return ret;
340}
341
760051fa 342static
5b29e799 343void ctf_fs_iterator_destroy_data(struct ctf_fs_iterator *ctf_it)
760051fa 344{
5b29e799
JG
345 bt_put(ctf_it->current_notification);
346 bt_put(ctf_it->pending_notifications);
347 if (ctf_it->pending_streams) {
348 g_ptr_array_free(ctf_it->pending_streams, TRUE);
56a1cced 349 }
5b29e799
JG
350 if (ctf_it->stream_ht) {
351 g_hash_table_destroy(ctf_it->stream_ht);
c14d7e26 352 }
5b29e799 353 g_free(ctf_it);
760051fa
JG
354}
355
356static
5b29e799 357void ctf_fs_iterator_destroy(struct bt_notification_iterator *it)
760051fa 358{
5b29e799 359 void *data = bt_notification_iterator_get_private_data(it);
760051fa 360
5b29e799 361 ctf_fs_iterator_destroy_data(data);
4c1456f0
JG
362}
363
e7a4393b 364static
5b29e799
JG
365bool compare_notifications(struct bt_notification *a, struct bt_notification *b,
366 void *unused)
367{
368 return a < b;
369}
370
371static
372void stream_destroy(void *stream)
373{
374 ctf_fs_stream_destroy((struct ctf_fs_stream *) stream);
375}
376
377static
378int open_trace_streams(struct ctf_fs_component *ctf_fs,
379 struct ctf_fs_iterator *ctf_it)
e7a4393b
JG
380{
381 int ret = 0;
382 const char *name;
383 GError *error = NULL;
384 GDir *dir = g_dir_open(ctf_fs->trace_path->str, 0, &error);
385
386 if (!dir) {
387 PERR("Cannot open directory \"%s\": %s (code %d)\n",
388 ctf_fs->trace_path->str, error->message,
389 error->code);
390 goto error;
391 }
392
393 while ((name = g_dir_read_name(dir))) {
394 struct ctf_fs_file *file = NULL;
395 struct ctf_fs_stream *stream = NULL;
396
397 if (!strcmp(name, CTF_FS_METADATA_FILENAME)) {
398 /* Ignore the metadata stream. */
399 PDBG("Ignoring metadata file \"%s\"\n",
400 name);
401 continue;
402 }
403
404 if (name[0] == '.') {
405 PDBG("Ignoring hidden file \"%s\"\n",
406 name);
407 continue;
408 }
409
410 /* Create the file. */
411 file = ctf_fs_file_create(ctf_fs);
412 if (!file) {
413 PERR("Cannot create stream file object\n");
414 goto error;
415 }
416
417 /* Create full path string. */
418 g_string_append_printf(file->path, "%s/%s",
419 ctf_fs->trace_path->str, name);
420 if (!g_file_test(file->path->str, G_FILE_TEST_IS_REGULAR)) {
421 PDBG("Ignoring non-regular file \"%s\"\n", name);
422 ctf_fs_file_destroy(file);
423 continue;
424 }
425
426 /* Open the file. */
427 if (ctf_fs_file_open(ctf_fs, file, "rb")) {
428 ctf_fs_file_destroy(file);
429 goto error;
430 }
431
432 /* Create a private stream; file ownership is passed to it. */
433 stream = ctf_fs_stream_create(ctf_fs, file);
434 if (!stream) {
435 ctf_fs_file_destroy(file);
436 goto error;
437 }
438
5b29e799 439 g_ptr_array_add(ctf_it->pending_streams, stream);
e7a4393b
JG
440 }
441
442 goto end;
443error:
444 ret = -1;
445end:
446 if (dir) {
447 g_dir_close(dir);
448 dir = NULL;
449 }
450 if (error) {
451 g_error_free(error);
452 }
453 return ret;
454}
455
456static
5b29e799
JG
457enum bt_component_status ctf_fs_iterator_init(struct bt_component *source,
458 struct bt_notification_iterator *it)
e7a4393b 459{
5b29e799
JG
460 struct ctf_fs_iterator *ctf_it;
461 struct ctf_fs_component *ctf_fs;
462 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
463
464 assert(source && it);
465
466 ctf_fs = bt_component_get_private_data(source);
467 if (!ctf_fs) {
468 ret = BT_COMPONENT_STATUS_INVALID;
469 goto end;
470 }
471
472 ctf_it = g_new0(struct ctf_fs_iterator, 1);
473 if (!ctf_it) {
474 ret = BT_COMPONENT_STATUS_NOMEM;
475 goto end;
476 }
477
478 ctf_it->stream_ht = g_hash_table_new_full(g_direct_hash,
479 g_direct_equal, bt_put, stream_destroy);
480 if (!ctf_it->stream_ht) {
481 goto error;
482 }
483 ctf_it->pending_streams = g_ptr_array_new_with_free_func(
484 stream_destroy);
485 if (!ctf_it->pending_streams) {
486 goto error;
487 }
488 ctf_it->pending_notifications = bt_notification_heap_create(
489 compare_notifications, NULL);
490 if (!ctf_it->pending_notifications) {
491 goto error;
492 }
493
494 ret = open_trace_streams(ctf_fs, ctf_it);
495 if (ret) {
496 goto error;
497 }
498
499 ret = bt_notification_iterator_set_get_cb(it, ctf_fs_iterator_get);
500 if (ret) {
501 goto error;
502 }
503
504 ret = bt_notification_iterator_set_next_cb(it, ctf_fs_iterator_next);
505 if (ret) {
506 goto error;
507 }
508
509 ret = bt_notification_iterator_set_destroy_cb(it,
510 ctf_fs_iterator_destroy);
511 if (ret) {
512 goto error;
513 }
514
515 ret = bt_notification_iterator_set_private_data(it, ctf_it);
516 if (ret) {
517 goto error;
518 }
519end:
520 return ret;
521error:
522 (void) bt_notification_iterator_set_private_data(it, NULL);
523 ctf_fs_iterator_destroy_data(ctf_it);
524 goto end;
525}
526
527static
528void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs)
529{
530 if (ctf_fs->trace_path) {
531 g_string_free(ctf_fs->trace_path, TRUE);
532 }
533 if (ctf_fs->metadata) {
534 ctf_fs_metadata_fini(ctf_fs->metadata);
535 g_free(ctf_fs->metadata);
536 }
537 g_free(ctf_fs);
538}
539
540static
541void ctf_fs_destroy(struct bt_component *component)
542{
543 void *data = bt_component_get_private_data(component);
544
545 ctf_fs_destroy_data(data);
e7a4393b
JG
546}
547
56a1cced
JG
548static
549struct ctf_fs_component *ctf_fs_create(struct bt_value *params)
550{
551 struct ctf_fs_component *ctf_fs;
1ef09eb5 552 struct bt_value *value = NULL;
56a1cced
JG
553 const char *path;
554 enum bt_value_status ret;
555
556 ctf_fs = g_new0(struct ctf_fs_component, 1);
557 if (!ctf_fs) {
558 goto end;
559 }
560
561 /* FIXME: should probably look for a source URI */
562 value = bt_value_map_get(params, "path");
563 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
564 goto error;
565 }
566
567 ret = bt_value_string_get(value, &path);
568 if (ret != BT_VALUE_STATUS_OK) {
569 goto error;
570 }
571
572 ctf_fs->trace_path = g_string_new(path);
573 if (!ctf_fs->trace_path) {
574 goto error;
575 }
56a1cced
JG
576 ctf_fs->error_fp = stderr;
577 ctf_fs->page_size = (size_t) getpagesize();
e7a4393b
JG
578
579 // FIXME: check error.
5b29e799
JG
580 ctf_fs->metadata = g_new0(struct ctf_fs_metadata, 1);
581 if (!ctf_fs->metadata) {
e7a4393b
JG
582 goto error;
583 }
5b29e799 584 ctf_fs_metadata_set_trace(ctf_fs);
1ef09eb5
JG
585 goto end;
586
56a1cced
JG
587error:
588 ctf_fs_destroy_data(ctf_fs);
e7a4393b 589 ctf_fs = NULL;
1ef09eb5
JG
590end:
591 BT_PUT(value);
56a1cced
JG
592 return ctf_fs;
593}
594
ea0b4b9e
JG
595BT_HIDDEN
596enum bt_component_status ctf_fs_init(struct bt_component *source,
5c80adeb 597 struct bt_value *params)
ea0b4b9e
JG
598{
599 struct ctf_fs_component *ctf_fs;
600 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
601
602 assert(source);
603 ctf_fs_debug = g_strcmp0(getenv("CTF_FS_DEBUG"), "1") == 0;
604 ctf_fs = ctf_fs_create(params);
605 if (!ctf_fs) {
606 ret = BT_COMPONENT_STATUS_NOMEM;
607 goto end;
608 }
4c1456f0 609
ea0b4b9e
JG
610 ret = bt_component_set_destroy_cb(source, ctf_fs_destroy);
611 if (ret != BT_COMPONENT_STATUS_OK) {
612 goto error;
613 }
614
615 ret = bt_component_set_private_data(source, ctf_fs);
616 if (ret != BT_COMPONENT_STATUS_OK) {
617 goto error;
618 }
619
620 ret = bt_component_source_set_iterator_init_cb(source,
621 ctf_fs_iterator_init);
622 if (ret != BT_COMPONENT_STATUS_OK) {
623 goto error;
624 }
625end:
626 return ret;
627error:
628 (void) bt_component_set_private_data(source, NULL);
760051fa 629 ctf_fs_destroy_data(ctf_fs);
ea0b4b9e
JG
630 return ret;
631}
This page took 0.049918 seconds and 4 git commands to generate.