4 * Babeltrace CTF Writer Output Plugin Event Handling
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK-WRITE"
32 #include <babeltrace/ctf-ir/event.h>
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/ctf-ir/event-class.h>
35 #include <babeltrace/ctf-ir/stream.h>
36 #include <babeltrace/ctf-ir/stream-class.h>
37 #include <babeltrace/ctf-ir/clock-class.h>
38 #include <babeltrace/ctf-ir/fields.h>
39 #include <babeltrace/ctf-writer/stream-class.h>
40 #include <babeltrace/ctf-writer/stream.h>
44 #include <ctfcopytrace.h>
49 void unref_stream_class(struct bt_ctf_stream_class
*writer_stream_class
)
51 bt_put(writer_stream_class
);
55 void unref_stream(struct bt_ctf_stream_class
*writer_stream
)
57 bt_put(writer_stream
);
61 gboolean
empty_ht(gpointer key
, gpointer value
, gpointer user_data
)
67 gboolean
empty_streams_ht(gpointer key
, gpointer value
, gpointer user_data
)
69 struct bt_ctf_stream
*writer_stream
= value
;
71 bt_ctf_stream_flush(writer_stream
);
77 void destroy_stream_state_key(gpointer key
)
79 g_free((enum fs_writer_stream_state
*) key
);
83 void check_completed_trace(gpointer key
, gpointer value
, gpointer user_data
)
85 enum fs_writer_stream_state
*state
= value
;
86 int *trace_completed
= user_data
;
88 if (*state
!= FS_WRITER_COMPLETED_STREAM
) {
94 void trace_is_static_listener(struct bt_ctf_trace
*trace
, void *data
)
96 struct fs_writer
*fs_writer
= data
;
97 int trace_completed
= 1;
99 fs_writer
->trace_static
= 1;
101 g_hash_table_foreach(fs_writer
->stream_states
,
102 check_completed_trace
, &trace_completed
);
103 if (trace_completed
) {
104 writer_close(fs_writer
->writer_component
, fs_writer
);
105 g_hash_table_remove(fs_writer
->writer_component
->trace_map
,
111 struct bt_ctf_stream_class
*insert_new_stream_class(
112 struct writer_component
*writer_component
,
113 struct fs_writer
*fs_writer
,
114 struct bt_ctf_stream_class
*stream_class
)
116 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
117 struct bt_ctf_trace
*trace
= NULL
, *writer_trace
= NULL
;
118 struct bt_ctf_writer
*ctf_writer
= fs_writer
->writer
;
119 enum bt_component_status ret
;
121 trace
= bt_ctf_stream_class_get_trace(stream_class
);
124 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
125 assert(writer_trace
);
127 ret
= ctf_copy_clock_classes(writer_component
->err
, writer_trace
,
128 writer_stream_class
, trace
);
129 if (ret
!= BT_COMPONENT_STATUS_OK
) {
130 BT_LOGE_STR("Failed to copy clock classes.");
134 writer_stream_class
= ctf_copy_stream_class(writer_component
->err
,
135 stream_class
, writer_trace
, true);
136 if (!writer_stream_class
) {
137 BT_LOGE_STR("Failed to copy stream class.");
141 ret
= bt_ctf_trace_add_stream_class(writer_trace
, writer_stream_class
);
143 BT_LOGE_STR("Failed to add stream_class.");
147 g_hash_table_insert(fs_writer
->stream_class_map
,
148 (gpointer
) stream_class
, writer_stream_class
);
153 BT_PUT(writer_stream_class
);
155 bt_put(writer_trace
);
157 return writer_stream_class
;
161 enum fs_writer_stream_state
*insert_new_stream_state(
162 struct writer_component
*writer_component
,
163 struct fs_writer
*fs_writer
, struct bt_ctf_stream
*stream
)
165 enum fs_writer_stream_state
*v
= NULL
;
167 v
= g_new0(enum fs_writer_stream_state
, 1);
169 BT_LOGE_STR("Failed to allocate fs_writer_stream_state.");
172 *v
= FS_WRITER_UNKNOWN_STREAM
;
174 g_hash_table_insert(fs_writer
->stream_states
, stream
, v
);
180 * Make sure the output path is valid for a single trace: either it does
181 * not exists or it is empty.
183 * Return 0 if the path is valid, -1 otherwise.
186 bool valid_single_trace_path(const char *path
)
188 GError
*error
= NULL
;
192 dir
= g_dir_open(path
, 0, &error
);
194 /* Non-existent directory. */
196 /* For any other error, return an error */
197 if (error
->code
!= G_FILE_ERROR_NOENT
) {
203 /* g_dir_read_name skips "." and "..", error out on first result */
204 while (g_dir_read_name(dir
) != NULL
) {
221 int make_trace_path(struct writer_component
*writer_component
,
222 struct bt_ctf_trace
*trace
, char *trace_path
)
225 const char *trace_name
;
227 if (writer_component
->single_trace
) {
230 trace_name
= bt_ctf_trace_get_name(trace
);
232 trace_name
= writer_component
->trace_name_base
->str
;
236 /* Sanitize the trace name. */
237 if (strlen(trace_name
) == 2 && !strcmp(trace_name
, "..")) {
238 BT_LOGE_STR("Trace name cannot be \"..\".");
242 if (strstr(trace_name
, "../")) {
243 BT_LOGE_STR("Trace name cannot contain \"../\".");
248 snprintf(trace_path
, PATH_MAX
, "%s/%s",
249 writer_component
->base_path
->str
,
252 * Append a suffix if the trace_path exists and we are not in
255 if (writer_component
->single_trace
) {
256 if (valid_single_trace_path(trace_path
) != 0) {
257 BT_LOGE_STR("Invalid output directory.");
261 if (g_file_test(trace_path
, G_FILE_TEST_EXISTS
)) {
265 snprintf(trace_path
, PATH_MAX
, "%s/%s-%d",
266 writer_component
->base_path
->str
,
268 } while (g_file_test(trace_path
, G_FILE_TEST_EXISTS
) && i
< INT_MAX
);
270 BT_LOGE_STR("Unable to find a unique trace path.");
286 struct fs_writer
*insert_new_writer(
287 struct writer_component
*writer_component
,
288 struct bt_ctf_trace
*trace
)
290 struct bt_ctf_writer
*ctf_writer
= NULL
;
291 struct bt_ctf_trace
*writer_trace
= NULL
;
292 char trace_path
[PATH_MAX
];
293 enum bt_component_status ret
;
294 struct bt_ctf_stream
*stream
= NULL
;
295 struct fs_writer
*fs_writer
= NULL
;
298 if (writer_component
->single_trace
&& writer_component
->nr_traces
> 0) {
299 BT_LOGE_STR("Trying to process more than one trace but single trace mode enabled.");
303 ret
= make_trace_path(writer_component
, trace
, trace_path
);
305 BT_LOGE_STR("Failed to make trace path.");
309 printf("ctf.fs sink creating trace in %s\n", trace_path
);
311 ctf_writer
= bt_ctf_writer_create(trace_path
);
313 BT_LOGE_STR("Failed to create CTF writer.");
317 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
318 assert(writer_trace
);
320 ret
= ctf_copy_trace(writer_component
->err
, trace
, writer_trace
);
321 if (ret
!= BT_COMPONENT_STATUS_OK
) {
322 BT_LOGE_STR("Failed to copy trace.");
327 fs_writer
= g_new0(struct fs_writer
, 1);
329 BT_LOGE_STR("Failed to allocate fs_writer.");
332 fs_writer
->writer
= ctf_writer
;
333 fs_writer
->trace
= trace
;
334 fs_writer
->writer_trace
= writer_trace
;
335 fs_writer
->writer_component
= writer_component
;
336 BT_PUT(writer_trace
);
337 fs_writer
->stream_class_map
= g_hash_table_new_full(g_direct_hash
,
338 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream_class
);
339 fs_writer
->stream_map
= g_hash_table_new_full(g_direct_hash
,
340 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream
);
341 fs_writer
->stream_states
= g_hash_table_new_full(g_direct_hash
,
342 g_direct_equal
, NULL
, destroy_stream_state_key
);
344 /* Set all the existing streams in the unknown state. */
345 nr_stream
= bt_ctf_trace_get_stream_count(trace
);
346 for (i
= 0; i
< nr_stream
; i
++) {
347 stream
= bt_ctf_trace_get_stream_by_index(trace
, i
);
350 insert_new_stream_state(writer_component
, fs_writer
, stream
);
354 /* Check if the trace is already static or register a listener. */
355 if (bt_ctf_trace_is_static(trace
)) {
356 fs_writer
->trace_static
= 1;
357 fs_writer
->static_listener_id
= -1;
359 ret
= bt_ctf_trace_add_is_static_listener(trace
,
360 trace_is_static_listener
, fs_writer
);
362 fs_writer
->static_listener_id
= ret
;
365 writer_component
->nr_traces
++;
366 g_hash_table_insert(writer_component
->trace_map
, (gpointer
) trace
,
374 bt_put(writer_trace
);
382 struct fs_writer
*get_fs_writer(struct writer_component
*writer_component
,
383 struct bt_ctf_stream_class
*stream_class
)
385 struct bt_ctf_trace
*trace
= NULL
;
386 struct fs_writer
*fs_writer
;
388 trace
= bt_ctf_stream_class_get_trace(stream_class
);
391 fs_writer
= g_hash_table_lookup(writer_component
->trace_map
,
394 fs_writer
= insert_new_writer(writer_component
, trace
);
402 struct fs_writer
*get_fs_writer_from_stream(
403 struct writer_component
*writer_component
,
404 struct bt_ctf_stream
*stream
)
406 struct bt_ctf_stream_class
*stream_class
= NULL
;
407 struct fs_writer
*fs_writer
;
409 stream_class
= bt_ctf_stream_get_class(stream
);
410 assert(stream_class
);
412 fs_writer
= get_fs_writer(writer_component
, stream_class
);
414 bt_put(stream_class
);
419 struct bt_ctf_stream_class
*lookup_stream_class(
420 struct writer_component
*writer_component
,
421 struct bt_ctf_stream_class
*stream_class
)
423 struct fs_writer
*fs_writer
= get_fs_writer(
424 writer_component
, stream_class
);
426 return (struct bt_ctf_stream_class
*) g_hash_table_lookup(
427 fs_writer
->stream_class_map
, (gpointer
) stream_class
);
431 struct bt_ctf_stream
*lookup_stream(struct writer_component
*writer_component
,
432 struct bt_ctf_stream
*stream
)
434 struct fs_writer
*fs_writer
= get_fs_writer_from_stream(
435 writer_component
, stream
);
437 return (struct bt_ctf_stream
*) g_hash_table_lookup(
438 fs_writer
->stream_map
, (gpointer
) stream
);
442 struct bt_ctf_stream
*insert_new_stream(
443 struct writer_component
*writer_component
,
444 struct fs_writer
*fs_writer
,
445 struct bt_ctf_stream_class
*stream_class
,
446 struct bt_ctf_stream
*stream
)
448 struct bt_ctf_stream
*writer_stream
= NULL
;
449 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
450 struct bt_ctf_writer
*ctf_writer
= bt_get(fs_writer
->writer
);
452 writer_stream_class
= lookup_stream_class(writer_component
,
454 if (!writer_stream_class
) {
455 writer_stream_class
= insert_new_stream_class(
456 writer_component
, fs_writer
, stream_class
);
457 if (!writer_stream_class
) {
458 BT_LOGE_STR("Failed to insert a new stream_class.");
462 bt_get(writer_stream_class
);
464 writer_stream
= bt_ctf_stream_create(writer_stream_class
,
465 bt_ctf_stream_get_name(stream
));
466 assert(writer_stream
);
468 g_hash_table_insert(fs_writer
->stream_map
, (gpointer
) stream
,
474 BT_PUT(writer_stream
);
477 bt_put(writer_stream_class
);
478 return writer_stream
;
482 struct bt_ctf_event_class
*get_event_class(struct writer_component
*writer_component
,
483 struct bt_ctf_stream_class
*writer_stream_class
,
484 struct bt_ctf_event_class
*event_class
)
486 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class
,
487 bt_ctf_event_class_get_id(event_class
));
491 struct bt_ctf_stream
*get_writer_stream(
492 struct writer_component
*writer_component
,
493 struct bt_ctf_packet
*packet
, struct bt_ctf_stream
*stream
)
495 struct bt_ctf_stream
*writer_stream
= NULL
;
497 writer_stream
= lookup_stream(writer_component
, stream
);
498 if (!writer_stream
) {
499 BT_LOGE_STR("Failed to find existing stream.");
502 bt_get(writer_stream
);
507 BT_PUT(writer_stream
);
509 return writer_stream
;
513 void writer_close(struct writer_component
*writer_component
,
514 struct fs_writer
*fs_writer
)
516 if (fs_writer
->static_listener_id
>= 0) {
517 bt_ctf_trace_remove_is_static_listener(fs_writer
->trace
,
518 fs_writer
->static_listener_id
);
521 /* Empty the stream class HT. */
522 g_hash_table_foreach_remove(fs_writer
->stream_class_map
,
524 g_hash_table_destroy(fs_writer
->stream_class_map
);
526 /* Empty the stream HT. */
527 g_hash_table_foreach_remove(fs_writer
->stream_map
,
528 empty_streams_ht
, NULL
);
529 g_hash_table_destroy(fs_writer
->stream_map
);
531 /* Empty the stream state HT. */
532 g_hash_table_foreach_remove(fs_writer
->stream_states
,
534 g_hash_table_destroy(fs_writer
->stream_states
);
538 enum bt_component_status
writer_stream_begin(
539 struct writer_component
*writer_component
,
540 struct bt_ctf_stream
*stream
)
542 struct bt_ctf_stream_class
*stream_class
= NULL
;
543 struct fs_writer
*fs_writer
;
544 struct bt_ctf_stream
*writer_stream
= NULL
;
545 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
546 enum fs_writer_stream_state
*state
;
548 stream_class
= bt_ctf_stream_get_class(stream
);
549 assert(stream_class
);
551 fs_writer
= get_fs_writer(writer_component
, stream_class
);
553 BT_LOGE_STR("Failed to get fs_writer.");
557 /* Set the stream as active */
558 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
560 if (fs_writer
->trace_static
) {
561 BT_LOGE_STR("Cannot add new stream on a static trace.");
564 state
= insert_new_stream_state(writer_component
, fs_writer
,
567 if (*state
!= FS_WRITER_UNKNOWN_STREAM
) {
568 BT_LOGE("Unexpected stream state: state=%d", *state
);
571 *state
= FS_WRITER_ACTIVE_STREAM
;
573 writer_stream
= insert_new_stream(writer_component
, fs_writer
,
574 stream_class
, stream
);
575 if (!writer_stream
) {
576 BT_LOGE_STR("Failed to insert new stream.");
583 ret
= BT_COMPONENT_STATUS_ERROR
;
585 bt_put(stream_class
);
590 enum bt_component_status
writer_stream_end(
591 struct writer_component
*writer_component
,
592 struct bt_ctf_stream
*stream
)
594 struct bt_ctf_stream_class
*stream_class
= NULL
;
595 struct fs_writer
*fs_writer
;
596 struct bt_ctf_trace
*trace
= NULL
;
597 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
598 enum fs_writer_stream_state
*state
;
600 stream_class
= bt_ctf_stream_get_class(stream
);
601 assert(stream_class
);
603 fs_writer
= get_fs_writer(writer_component
, stream_class
);
605 BT_LOGE_STR("Failed to get fs_writer.");
609 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
610 if (*state
!= FS_WRITER_ACTIVE_STREAM
) {
611 BT_LOGE("Unexpected stream state: state=%d", *state
);
614 *state
= FS_WRITER_COMPLETED_STREAM
;
616 g_hash_table_remove(fs_writer
->stream_map
, stream
);
618 if (fs_writer
->trace_static
) {
619 int trace_completed
= 1;
621 g_hash_table_foreach(fs_writer
->stream_states
,
622 check_completed_trace
, &trace_completed
);
623 if (trace_completed
) {
624 writer_close(writer_component
, fs_writer
);
625 g_hash_table_remove(writer_component
->trace_map
,
633 ret
= BT_COMPONENT_STATUS_ERROR
;
636 BT_PUT(stream_class
);
641 enum bt_component_status
writer_new_packet(
642 struct writer_component
*writer_component
,
643 struct bt_ctf_packet
*packet
)
645 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
646 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
649 stream
= bt_ctf_packet_get_stream(packet
);
652 writer_stream
= get_writer_stream(writer_component
, packet
, stream
);
653 if (!writer_stream
) {
654 BT_LOGE_STR("Failed to get writer_stream.");
659 int_ret
= ctf_stream_copy_packet_context(
660 writer_component
->err
, packet
, writer_stream
);
662 BT_LOGE_STR("Failed to copy packet_context.");
666 ret
= ctf_stream_copy_packet_header(writer_component
->err
,
667 packet
, writer_stream
);
669 BT_LOGE_STR("Failed to copy packet_header.");
676 ret
= BT_COMPONENT_STATUS_ERROR
;
678 bt_put(writer_stream
);
684 enum bt_component_status
writer_close_packet(
685 struct writer_component
*writer_component
,
686 struct bt_ctf_packet
*packet
)
688 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
689 enum bt_component_status ret
;
691 stream
= bt_ctf_packet_get_stream(packet
);
694 writer_stream
= lookup_stream(writer_component
, stream
);
695 if (!writer_stream
) {
696 BT_LOGE_STR("Failed to find existing stream.");
701 bt_get(writer_stream
);
703 ret
= bt_ctf_stream_flush(writer_stream
);
705 BT_LOGE_STR("Failed to flush stream.");
709 BT_PUT(writer_stream
);
711 ret
= BT_COMPONENT_STATUS_OK
;
715 ret
= BT_COMPONENT_STATUS_ERROR
;
717 bt_put(writer_stream
);
723 enum bt_component_status
writer_output_event(
724 struct writer_component
*writer_component
,
725 struct bt_ctf_event
*event
)
727 enum bt_component_status ret
;
728 struct bt_ctf_event_class
*event_class
= NULL
, *writer_event_class
= NULL
;
729 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
730 struct bt_ctf_stream_class
*stream_class
= NULL
, *writer_stream_class
= NULL
;
731 struct bt_ctf_event
*writer_event
= NULL
;
734 event_class
= bt_ctf_event_get_class(event
);
737 stream
= bt_ctf_event_get_stream(event
);
740 writer_stream
= lookup_stream(writer_component
, stream
);
741 if (!writer_stream
|| !bt_get(writer_stream
)) {
742 BT_LOGE_STR("Failed for find existing stream.");
746 stream_class
= bt_ctf_event_class_get_stream_class(event_class
);
747 assert(stream_class
);
749 writer_stream_class
= lookup_stream_class(writer_component
, stream_class
);
750 if (!writer_stream_class
|| !bt_get(writer_stream_class
)) {
751 BT_LOGE_STR("Failed to find existing stream_class.");
755 writer_event_class
= get_event_class(writer_component
,
756 writer_stream_class
, event_class
);
757 if (!writer_event_class
) {
758 writer_event_class
= ctf_copy_event_class(writer_component
->err
,
760 if (!writer_event_class
) {
761 BT_LOGE_STR("Failed to copy event_class.");
764 int_ret
= bt_ctf_stream_class_add_event_class(
765 writer_stream_class
, writer_event_class
);
767 BT_LOGE("Failed to add event_class: event_name=\"%s\"",
768 bt_ctf_event_class_get_name(event_class
));
773 writer_event
= ctf_copy_event(writer_component
->err
, event
,
774 writer_event_class
, true);
776 BT_LOGE("Failed to copy event: event_class=\"%s\"",
777 bt_ctf_event_class_get_name(writer_event_class
));
781 int_ret
= bt_ctf_stream_append_event(writer_stream
, writer_event
);
783 BT_LOGE("Failed to append event: event_class=\"%s\"",
784 bt_ctf_event_class_get_name(writer_event_class
));
788 ret
= BT_COMPONENT_STATUS_OK
;
792 ret
= BT_COMPONENT_STATUS_ERROR
;
794 bt_put(writer_event
);
795 bt_put(writer_event_class
);
796 bt_put(writer_stream_class
);
797 bt_put(stream_class
);
798 bt_put(writer_stream
);