Merge streams in ctf fs component
[babeltrace.git] / plugins / ctf / fs / fs.c
1 /*
2 * fs.c
3 *
4 * Babeltrace CTF file system Reader Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/plugin/plugin-system.h>
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/plugin/notification/iterator.h>
32 #include <babeltrace/plugin/notification/stream.h>
33 #include <babeltrace/plugin/notification/event.h>
34 #include <babeltrace/plugin/notification/packet.h>
35 #include <babeltrace/plugin/notification/heap.h>
36 #include <glib.h>
37 #include <assert.h>
38 #include <unistd.h>
39 #include "fs.h"
40 #include "metadata.h"
41 #include "data-stream.h"
42 #include "file.h"
43
44 #define PRINT_ERR_STREAM ctf_fs->error_fp
45 #define PRINT_PREFIX "ctf-fs"
46 #include "print.h"
47
48 static bool ctf_fs_debug;
49
50 static
51 struct bt_notification *ctf_fs_iterator_get(
52 struct bt_notification_iterator *iterator)
53 {
54 struct ctf_fs_iterator *ctf_it =
55 bt_notification_iterator_get_private_data(iterator);
56
57 return bt_get(ctf_it->current_notification);
58 }
59
60 static
61 enum bt_notification_iterator_status ctf_fs_iterator_get_next_notification(
62 struct ctf_fs_iterator *it,
63 struct ctf_fs_stream *stream,
64 struct bt_notification **notification)
65 {
66 enum bt_ctf_notif_iter_status status;
67 enum bt_notification_iterator_status ret;
68
69 if (stream->end_reached) {
70 status = BT_CTF_NOTIF_ITER_STATUS_EOF;
71 goto end;
72 }
73
74 status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter,
75 notification);
76 if (status != BT_CTF_NOTIF_ITER_STATUS_OK &&
77 status != BT_CTF_NOTIF_ITER_STATUS_EOF) {
78 goto end;
79 }
80
81 /* Should be handled in bt_ctf_notif_iter_get_next_notification. */
82 if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
83 *notification = bt_notification_stream_end_create(
84 stream->stream);
85 if (!*notification) {
86 status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
87 }
88 status = BT_CTF_NOTIF_ITER_STATUS_OK;
89 stream->end_reached = true;
90 }
91 end:
92 switch (status) {
93 case BT_CTF_NOTIF_ITER_STATUS_EOF:
94 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
95 break;
96 case BT_CTF_NOTIF_ITER_STATUS_OK:
97 ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
98 break;
99 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
100 /*
101 * Should not make it this far as this is medium-specific;
102 * there is nothing for the user to do and it should have been
103 * handled upstream.
104 */
105 assert(0);
106 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
107 /* No argument provided by the user, so don't return INVAL. */
108 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
109 default:
110 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
111 break;
112 }
113 return ret;
114 }
115
116 /*
117 * Remove me. This is a temporary work-around due to our inhability to use
118 * libbabeltrace-ctf from libbabeltrace-plugin.
119 */
120 static
121 struct bt_ctf_stream *internal_bt_notification_get_stream(
122 struct bt_notification *notification)
123 {
124 struct bt_ctf_stream *stream = NULL;
125
126 assert(notification);
127 switch (bt_notification_get_type(notification)) {
128 case BT_NOTIFICATION_TYPE_EVENT:
129 {
130 struct bt_ctf_event *event;
131
132 event = bt_notification_event_get_event(notification);
133 stream = bt_ctf_event_get_stream(event);
134 bt_put(event);
135 break;
136 }
137 case BT_NOTIFICATION_TYPE_PACKET_START:
138 {
139 struct bt_ctf_packet *packet;
140
141 packet = bt_notification_packet_start_get_packet(notification);
142 stream = bt_ctf_packet_get_stream(packet);
143 bt_put(packet);
144 break;
145 }
146 case BT_NOTIFICATION_TYPE_PACKET_END:
147 {
148 struct bt_ctf_packet *packet;
149
150 packet = bt_notification_packet_end_get_packet(notification);
151 stream = bt_ctf_packet_get_stream(packet);
152 bt_put(packet);
153 break;
154 }
155 case BT_NOTIFICATION_TYPE_STREAM_END:
156 stream = bt_notification_stream_end_get_stream(notification);
157 break;
158 default:
159 goto end;
160 }
161 end:
162 return stream;
163 }
164
165 static
166 enum bt_notification_iterator_status populate_heap(struct ctf_fs_iterator *it)
167 {
168 size_t i, pending_streams_count = it->pending_streams->len;
169 enum bt_notification_iterator_status ret =
170 BT_NOTIFICATION_ITERATOR_STATUS_OK;
171
172 /* Insert one stream-associated notification for each stream. */
173 for (i = 0; i < pending_streams_count; i++) {
174 struct bt_notification *notification;
175 struct ctf_fs_stream *fs_stream;
176 struct bt_ctf_stream *stream;
177 size_t pending_stream_index = pending_streams_count - 1 - i;
178
179 fs_stream = g_ptr_array_index(it->pending_streams,
180 pending_stream_index);
181
182 do {
183 int heap_ret;
184
185 ret = ctf_fs_iterator_get_next_notification(
186 it, fs_stream, &notification);
187 if (ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END) {
188 printf_debug("Failed to populate heap at stream %zu\n",
189 pending_stream_index);
190 goto end;
191 }
192
193 stream = internal_bt_notification_get_stream(
194 notification);
195 if (stream) {
196 gboolean inserted;
197
198 /*
199 * Associate pending ctf_fs_stream to
200 * bt_ctf_stream. Ownership of stream
201 * is passed to the stream ht.
202 */
203 inserted = g_hash_table_insert(it->stream_ht,
204 stream, fs_stream);
205 if (!inserted) {
206 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
207 printf_debug("Failed to associate fs stream to ctf stream\n");
208 goto end;
209 }
210 }
211
212 heap_ret = bt_notification_heap_insert(
213 it->pending_notifications,
214 notification);
215 bt_put(notification);
216 if (heap_ret) {
217 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
218 printf_debug("Failed to insert notification in heap\n");
219 goto end;
220 }
221 } while (!stream && ret != BT_NOTIFICATION_ITERATOR_STATUS_END);
222 /*
223 * Set NULL so the destruction callback registered with the
224 * array is not invoked on the stream (its ownership was
225 * transferred to the streams hashtable).
226 */
227 g_ptr_array_index(it->pending_streams,
228 pending_stream_index) = NULL;
229 g_ptr_array_remove_index(it->pending_streams,
230 pending_stream_index);
231 }
232
233 g_ptr_array_free(it->pending_streams, TRUE);
234 it->pending_streams = NULL;
235 end:
236 return ret;
237 }
238
239 static
240 enum bt_notification_iterator_status ctf_fs_iterator_next(
241 struct bt_notification_iterator *iterator)
242 {
243 int heap_ret;
244 struct bt_ctf_stream *stream;
245 struct ctf_fs_stream *fs_stream;
246 struct bt_notification *notification;
247 struct bt_notification *next_stream_notification;
248 enum bt_notification_iterator_status ret =
249 BT_NOTIFICATION_ITERATOR_STATUS_OK;
250 struct ctf_fs_iterator *ctf_it =
251 bt_notification_iterator_get_private_data(iterator);
252
253 notification = bt_notification_heap_pop(ctf_it->pending_notifications);
254 if (!notification && !ctf_it->pending_streams) {
255 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
256 goto end;
257 }
258
259 if (!notification && ctf_it->pending_streams) {
260 /*
261 * Insert at one notification per stream in the heap and pop
262 * one.
263 */
264 ret = populate_heap(ctf_it);
265 if (ret) {
266 goto end;
267 }
268
269 notification = bt_notification_heap_pop(
270 ctf_it->pending_notifications);
271 if (!notification) {
272 ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
273 goto end;
274 }
275 }
276
277 /* notification is set from here. */
278
279 stream = internal_bt_notification_get_stream(notification);
280 if (!stream) {
281 /*
282 * The current notification is not associated to a particular
283 * stream, there is no need to insert a new notification from
284 * a stream in the heap.
285 */
286 goto end;
287 }
288
289 fs_stream = g_hash_table_lookup(ctf_it->stream_ht, stream);
290 if (!fs_stream) {
291 /* We have reached this stream's end. */
292 goto end;
293 }
294
295 ret = ctf_fs_iterator_get_next_notification(ctf_it, fs_stream,
296 &next_stream_notification);
297 if ((ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END)) {
298 heap_ret = bt_notification_heap_insert(
299 ctf_it->pending_notifications, notification);
300
301 assert(!next_stream_notification);
302 if (heap_ret) {
303 /*
304 * We're dropping the most recent notification, but at
305 * this point, something is seriously wrong...
306 */
307 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
308 }
309 BT_PUT(notification);
310 goto end;
311 }
312
313 if (ret == BT_NOTIFICATION_ITERATOR_STATUS_END) {
314 gboolean success;
315
316 /* Remove stream. */
317 success = g_hash_table_remove(ctf_it->stream_ht, stream);
318 assert(success);
319 ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
320 } else {
321 heap_ret = bt_notification_heap_insert(ctf_it->pending_notifications,
322 next_stream_notification);
323 BT_PUT(next_stream_notification);
324 if (heap_ret) {
325 /*
326 * We're dropping the most recent notification...
327 */
328 ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
329 }
330 }
331
332 /*
333 * Ensure that the stream is removed from both pending_streams and
334 * the streams hashtable on reception of the "end of stream"
335 * notification.
336 */
337 end:
338 BT_MOVE(ctf_it->current_notification, notification);
339 return ret;
340 }
341
342 static
343 void ctf_fs_iterator_destroy_data(struct ctf_fs_iterator *ctf_it)
344 {
345 bt_put(ctf_it->current_notification);
346 bt_put(ctf_it->pending_notifications);
347 if (ctf_it->pending_streams) {
348 g_ptr_array_free(ctf_it->pending_streams, TRUE);
349 }
350 if (ctf_it->stream_ht) {
351 g_hash_table_destroy(ctf_it->stream_ht);
352 }
353 g_free(ctf_it);
354 }
355
356 static
357 void ctf_fs_iterator_destroy(struct bt_notification_iterator *it)
358 {
359 void *data = bt_notification_iterator_get_private_data(it);
360
361 ctf_fs_iterator_destroy_data(data);
362 }
363
364 static
365 bool compare_notifications(struct bt_notification *a, struct bt_notification *b,
366 void *unused)
367 {
368 return a < b;
369 }
370
371 static
372 void stream_destroy(void *stream)
373 {
374 ctf_fs_stream_destroy((struct ctf_fs_stream *) stream);
375 }
376
377 static
378 int open_trace_streams(struct ctf_fs_component *ctf_fs,
379 struct ctf_fs_iterator *ctf_it)
380 {
381 int ret = 0;
382 const char *name;
383 GError *error = NULL;
384 GDir *dir = g_dir_open(ctf_fs->trace_path->str, 0, &error);
385
386 if (!dir) {
387 PERR("Cannot open directory \"%s\": %s (code %d)\n",
388 ctf_fs->trace_path->str, error->message,
389 error->code);
390 goto error;
391 }
392
393 while ((name = g_dir_read_name(dir))) {
394 struct ctf_fs_file *file = NULL;
395 struct ctf_fs_stream *stream = NULL;
396
397 if (!strcmp(name, CTF_FS_METADATA_FILENAME)) {
398 /* Ignore the metadata stream. */
399 PDBG("Ignoring metadata file \"%s\"\n",
400 name);
401 continue;
402 }
403
404 if (name[0] == '.') {
405 PDBG("Ignoring hidden file \"%s\"\n",
406 name);
407 continue;
408 }
409
410 /* Create the file. */
411 file = ctf_fs_file_create(ctf_fs);
412 if (!file) {
413 PERR("Cannot create stream file object\n");
414 goto error;
415 }
416
417 /* Create full path string. */
418 g_string_append_printf(file->path, "%s/%s",
419 ctf_fs->trace_path->str, name);
420 if (!g_file_test(file->path->str, G_FILE_TEST_IS_REGULAR)) {
421 PDBG("Ignoring non-regular file \"%s\"\n", name);
422 ctf_fs_file_destroy(file);
423 continue;
424 }
425
426 /* Open the file. */
427 if (ctf_fs_file_open(ctf_fs, file, "rb")) {
428 ctf_fs_file_destroy(file);
429 goto error;
430 }
431
432 /* Create a private stream; file ownership is passed to it. */
433 stream = ctf_fs_stream_create(ctf_fs, file);
434 if (!stream) {
435 ctf_fs_file_destroy(file);
436 goto error;
437 }
438
439 g_ptr_array_add(ctf_it->pending_streams, stream);
440 }
441
442 goto end;
443 error:
444 ret = -1;
445 end:
446 if (dir) {
447 g_dir_close(dir);
448 dir = NULL;
449 }
450 if (error) {
451 g_error_free(error);
452 }
453 return ret;
454 }
455
456 static
457 enum bt_component_status ctf_fs_iterator_init(struct bt_component *source,
458 struct bt_notification_iterator *it)
459 {
460 struct ctf_fs_iterator *ctf_it;
461 struct ctf_fs_component *ctf_fs;
462 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
463
464 assert(source && it);
465
466 ctf_fs = bt_component_get_private_data(source);
467 if (!ctf_fs) {
468 ret = BT_COMPONENT_STATUS_INVALID;
469 goto end;
470 }
471
472 ctf_it = g_new0(struct ctf_fs_iterator, 1);
473 if (!ctf_it) {
474 ret = BT_COMPONENT_STATUS_NOMEM;
475 goto end;
476 }
477
478 ctf_it->stream_ht = g_hash_table_new_full(g_direct_hash,
479 g_direct_equal, bt_put, stream_destroy);
480 if (!ctf_it->stream_ht) {
481 goto error;
482 }
483 ctf_it->pending_streams = g_ptr_array_new_with_free_func(
484 stream_destroy);
485 if (!ctf_it->pending_streams) {
486 goto error;
487 }
488 ctf_it->pending_notifications = bt_notification_heap_create(
489 compare_notifications, NULL);
490 if (!ctf_it->pending_notifications) {
491 goto error;
492 }
493
494 ret = open_trace_streams(ctf_fs, ctf_it);
495 if (ret) {
496 goto error;
497 }
498
499 ret = bt_notification_iterator_set_get_cb(it, ctf_fs_iterator_get);
500 if (ret) {
501 goto error;
502 }
503
504 ret = bt_notification_iterator_set_next_cb(it, ctf_fs_iterator_next);
505 if (ret) {
506 goto error;
507 }
508
509 ret = bt_notification_iterator_set_destroy_cb(it,
510 ctf_fs_iterator_destroy);
511 if (ret) {
512 goto error;
513 }
514
515 ret = bt_notification_iterator_set_private_data(it, ctf_it);
516 if (ret) {
517 goto error;
518 }
519 end:
520 return ret;
521 error:
522 (void) bt_notification_iterator_set_private_data(it, NULL);
523 ctf_fs_iterator_destroy_data(ctf_it);
524 goto end;
525 }
526
527 static
528 void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs)
529 {
530 if (ctf_fs->trace_path) {
531 g_string_free(ctf_fs->trace_path, TRUE);
532 }
533 if (ctf_fs->metadata) {
534 ctf_fs_metadata_fini(ctf_fs->metadata);
535 g_free(ctf_fs->metadata);
536 }
537 g_free(ctf_fs);
538 }
539
540 static
541 void ctf_fs_destroy(struct bt_component *component)
542 {
543 void *data = bt_component_get_private_data(component);
544
545 ctf_fs_destroy_data(data);
546 }
547
548 static
549 struct ctf_fs_component *ctf_fs_create(struct bt_value *params)
550 {
551 struct ctf_fs_component *ctf_fs;
552 struct bt_value *value = NULL;
553 const char *path;
554 enum bt_value_status ret;
555
556 ctf_fs = g_new0(struct ctf_fs_component, 1);
557 if (!ctf_fs) {
558 goto end;
559 }
560
561 /* FIXME: should probably look for a source URI */
562 value = bt_value_map_get(params, "path");
563 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
564 goto error;
565 }
566
567 ret = bt_value_string_get(value, &path);
568 if (ret != BT_VALUE_STATUS_OK) {
569 goto error;
570 }
571
572 ctf_fs->trace_path = g_string_new(path);
573 if (!ctf_fs->trace_path) {
574 goto error;
575 }
576 ctf_fs->error_fp = stderr;
577 ctf_fs->page_size = (size_t) getpagesize();
578
579 // FIXME: check error.
580 ctf_fs->metadata = g_new0(struct ctf_fs_metadata, 1);
581 if (!ctf_fs->metadata) {
582 goto error;
583 }
584 ctf_fs_metadata_set_trace(ctf_fs);
585 goto end;
586
587 error:
588 ctf_fs_destroy_data(ctf_fs);
589 ctf_fs = NULL;
590 end:
591 BT_PUT(value);
592 return ctf_fs;
593 }
594
595 BT_HIDDEN
596 enum bt_component_status ctf_fs_init(struct bt_component *source,
597 struct bt_value *params)
598 {
599 struct ctf_fs_component *ctf_fs;
600 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
601
602 assert(source);
603 ctf_fs_debug = g_strcmp0(getenv("CTF_FS_DEBUG"), "1") == 0;
604 ctf_fs = ctf_fs_create(params);
605 if (!ctf_fs) {
606 ret = BT_COMPONENT_STATUS_NOMEM;
607 goto end;
608 }
609
610 ret = bt_component_set_destroy_cb(source, ctf_fs_destroy);
611 if (ret != BT_COMPONENT_STATUS_OK) {
612 goto error;
613 }
614
615 ret = bt_component_set_private_data(source, ctf_fs);
616 if (ret != BT_COMPONENT_STATUS_OK) {
617 goto error;
618 }
619
620 ret = bt_component_source_set_iterator_init_cb(source,
621 ctf_fs_iterator_init);
622 if (ret != BT_COMPONENT_STATUS_OK) {
623 goto error;
624 }
625 end:
626 return ret;
627 error:
628 (void) bt_component_set_private_data(source, NULL);
629 ctf_fs_destroy_data(ctf_fs);
630 return ret;
631 }
This page took 0.069402 seconds and 4 git commands to generate.