4 * Babeltrace CTF Writer Output Plugin Event Handling
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK-WRITE"
32 #include <babeltrace/ctf-ir/event.h>
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/ctf-ir/event-class.h>
35 #include <babeltrace/ctf-ir/stream.h>
36 #include <babeltrace/ctf-ir/stream-class.h>
37 #include <babeltrace/ctf-ir/clock-class.h>
38 #include <babeltrace/ctf-ir/fields.h>
39 #include <babeltrace/ctf-writer/stream-class.h>
40 #include <babeltrace/ctf-writer/stream.h>
44 #include <ctfcopytrace.h>
49 void unref_stream_class(struct bt_ctf_stream_class
*writer_stream_class
)
51 bt_put(writer_stream_class
);
55 void unref_stream(struct bt_ctf_stream_class
*writer_stream
)
57 bt_put(writer_stream
);
61 gboolean
empty_ht(gpointer key
, gpointer value
, gpointer user_data
)
67 gboolean
empty_streams_ht(gpointer key
, gpointer value
, gpointer user_data
)
69 struct bt_ctf_stream
*writer_stream
= value
;
71 bt_ctf_stream_flush(writer_stream
);
77 void destroy_stream_state_key(gpointer key
)
79 g_free((enum fs_writer_stream_state
*) key
);
83 void check_completed_trace(gpointer key
, gpointer value
, gpointer user_data
)
85 enum fs_writer_stream_state
*state
= value
;
86 int *trace_completed
= user_data
;
88 if (*state
!= FS_WRITER_COMPLETED_STREAM
) {
94 void trace_is_static_listener(struct bt_ctf_trace
*trace
, void *data
)
96 struct fs_writer
*fs_writer
= data
;
97 int trace_completed
= 1;
99 fs_writer
->trace_static
= 1;
101 g_hash_table_foreach(fs_writer
->stream_states
,
102 check_completed_trace
, &trace_completed
);
103 if (trace_completed
) {
104 writer_close(fs_writer
->writer_component
, fs_writer
);
105 g_hash_table_remove(fs_writer
->writer_component
->trace_map
,
111 struct bt_ctf_stream_class
*insert_new_stream_class(
112 struct writer_component
*writer_component
,
113 struct fs_writer
*fs_writer
,
114 struct bt_ctf_stream_class
*stream_class
)
116 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
117 struct bt_ctf_trace
*trace
= NULL
, *writer_trace
= NULL
;
118 struct bt_ctf_writer
*ctf_writer
= fs_writer
->writer
;
119 enum bt_component_status ret
;
121 trace
= bt_ctf_stream_class_get_trace(stream_class
);
124 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
125 assert(writer_trace
);
127 ret
= ctf_copy_clock_classes(writer_component
->err
, writer_trace
,
128 writer_stream_class
, trace
);
129 if (ret
!= BT_COMPONENT_STATUS_OK
) {
130 BT_LOGE_STR("Failed to copy clock classes.");
134 writer_stream_class
= ctf_copy_stream_class(writer_component
->err
,
135 stream_class
, writer_trace
, true);
136 if (!writer_stream_class
) {
137 BT_LOGE_STR("Failed to copy stream class.");
141 ret
= bt_ctf_trace_add_stream_class(writer_trace
, writer_stream_class
);
143 BT_LOGE_STR("Failed to add stream_class.");
147 g_hash_table_insert(fs_writer
->stream_class_map
,
148 (gpointer
) stream_class
, writer_stream_class
);
153 BT_PUT(writer_stream_class
);
155 bt_put(writer_trace
);
157 return writer_stream_class
;
161 enum fs_writer_stream_state
*insert_new_stream_state(
162 struct writer_component
*writer_component
,
163 struct fs_writer
*fs_writer
, struct bt_ctf_stream
*stream
)
165 enum fs_writer_stream_state
*v
= NULL
;
167 v
= g_new0(enum fs_writer_stream_state
, 1);
169 BT_LOGE_STR("Failed to allocate writer_stream_state.");
171 *v
= FS_WRITER_UNKNOWN_STREAM
;
173 g_hash_table_insert(fs_writer
->stream_states
, stream
, v
);
179 * Make sure the output path is valid for a single trace: either it does
180 * not exists or it is empty.
182 * Return 0 if the path is valid, -1 otherwise.
185 bool valid_single_trace_path(const char *path
)
187 GError
*error
= NULL
;
191 dir
= g_dir_open(path
, 0, &error
);
193 /* Non-existent directory. */
195 /* For any other error, return an error */
196 if (error
->code
!= G_FILE_ERROR_NOENT
) {
202 /* g_dir_read_name skips "." and "..", error out on first result */
203 while (g_dir_read_name(dir
) != NULL
) {
220 int make_trace_path(struct writer_component
*writer_component
,
221 struct bt_ctf_trace
*trace
, char *trace_path
)
224 const char *trace_name
;
226 if (writer_component
->single_trace
) {
229 trace_name
= bt_ctf_trace_get_name(trace
);
231 trace_name
= writer_component
->trace_name_base
->str
;
235 /* Sanitize the trace name. */
236 if (strlen(trace_name
) == 2 && !strcmp(trace_name
, "..")) {
237 BT_LOGE_STR("Trace name cannot be \"..\".");
241 if (strstr(trace_name
, "../")) {
242 BT_LOGE_STR("Trace name cannot contain \"../\".");
247 snprintf(trace_path
, PATH_MAX
, "%s/%s",
248 writer_component
->base_path
->str
,
251 * Append a suffix if the trace_path exists and we are not in
254 if (writer_component
->single_trace
) {
255 if (valid_single_trace_path(trace_path
) != 0) {
256 BT_LOGE_STR("Invalid output directory.");
260 if (g_file_test(trace_path
, G_FILE_TEST_EXISTS
)) {
264 snprintf(trace_path
, PATH_MAX
, "%s/%s-%d",
265 writer_component
->base_path
->str
,
267 } while (g_file_test(trace_path
, G_FILE_TEST_EXISTS
) && i
< INT_MAX
);
269 BT_LOGE_STR("Unable to find a unique trace path.");
285 struct fs_writer
*insert_new_writer(
286 struct writer_component
*writer_component
,
287 struct bt_ctf_trace
*trace
)
289 struct bt_ctf_writer
*ctf_writer
= NULL
;
290 struct bt_ctf_trace
*writer_trace
= NULL
;
291 char trace_path
[PATH_MAX
];
292 enum bt_component_status ret
;
293 struct bt_ctf_stream
*stream
= NULL
;
294 struct fs_writer
*fs_writer
= NULL
;
297 if (writer_component
->single_trace
&& writer_component
->nr_traces
> 0) {
298 BT_LOGE_STR("Trying to process more than one trace but single trace mode enabled.");
302 ret
= make_trace_path(writer_component
, trace
, trace_path
);
304 BT_LOGE_STR("Failed to make trace path.");
308 printf("ctf.fs sink creating trace in %s\n", trace_path
);
310 ctf_writer
= bt_ctf_writer_create(trace_path
);
312 BT_LOGE_STR("Failed to create CTF writer.");
316 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
317 assert(writer_trace
);
319 ret
= ctf_copy_trace(writer_component
->err
, trace
, writer_trace
);
320 if (ret
!= BT_COMPONENT_STATUS_OK
) {
321 BT_LOGE_STR("Failed to copy trace.");
326 fs_writer
= g_new0(struct fs_writer
, 1);
328 BT_LOGE_STR("Failed to allocate fs_writer.");
331 fs_writer
->writer
= ctf_writer
;
332 fs_writer
->trace
= trace
;
333 fs_writer
->writer_trace
= writer_trace
;
334 fs_writer
->writer_component
= writer_component
;
335 BT_PUT(writer_trace
);
336 fs_writer
->stream_class_map
= g_hash_table_new_full(g_direct_hash
,
337 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream_class
);
338 fs_writer
->stream_map
= g_hash_table_new_full(g_direct_hash
,
339 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream
);
340 fs_writer
->stream_states
= g_hash_table_new_full(g_direct_hash
,
341 g_direct_equal
, NULL
, destroy_stream_state_key
);
343 /* Set all the existing streams in the unknown state. */
344 nr_stream
= bt_ctf_trace_get_stream_count(trace
);
345 for (i
= 0; i
< nr_stream
; i
++) {
346 stream
= bt_ctf_trace_get_stream_by_index(trace
, i
);
349 insert_new_stream_state(writer_component
, fs_writer
, stream
);
353 /* Check if the trace is already static or register a listener. */
354 if (bt_ctf_trace_is_static(trace
)) {
355 fs_writer
->trace_static
= 1;
356 fs_writer
->static_listener_id
= -1;
358 ret
= bt_ctf_trace_add_is_static_listener(trace
,
359 trace_is_static_listener
, fs_writer
);
361 fs_writer
->static_listener_id
= ret
;
364 writer_component
->nr_traces
++;
365 g_hash_table_insert(writer_component
->trace_map
, (gpointer
) trace
,
373 bt_put(writer_trace
);
381 struct fs_writer
*get_fs_writer(struct writer_component
*writer_component
,
382 struct bt_ctf_stream_class
*stream_class
)
384 struct bt_ctf_trace
*trace
= NULL
;
385 struct fs_writer
*fs_writer
;
387 trace
= bt_ctf_stream_class_get_trace(stream_class
);
390 fs_writer
= g_hash_table_lookup(writer_component
->trace_map
,
393 fs_writer
= insert_new_writer(writer_component
, trace
);
401 struct fs_writer
*get_fs_writer_from_stream(
402 struct writer_component
*writer_component
,
403 struct bt_ctf_stream
*stream
)
405 struct bt_ctf_stream_class
*stream_class
= NULL
;
406 struct fs_writer
*fs_writer
;
408 stream_class
= bt_ctf_stream_get_class(stream
);
409 assert(stream_class
);
411 fs_writer
= get_fs_writer(writer_component
, stream_class
);
413 bt_put(stream_class
);
418 struct bt_ctf_stream_class
*lookup_stream_class(
419 struct writer_component
*writer_component
,
420 struct bt_ctf_stream_class
*stream_class
)
422 struct fs_writer
*fs_writer
= get_fs_writer(
423 writer_component
, stream_class
);
425 return (struct bt_ctf_stream_class
*) g_hash_table_lookup(
426 fs_writer
->stream_class_map
, (gpointer
) stream_class
);
430 struct bt_ctf_stream
*lookup_stream(struct writer_component
*writer_component
,
431 struct bt_ctf_stream
*stream
)
433 struct fs_writer
*fs_writer
= get_fs_writer_from_stream(
434 writer_component
, stream
);
436 return (struct bt_ctf_stream
*) g_hash_table_lookup(
437 fs_writer
->stream_map
, (gpointer
) stream
);
441 struct bt_ctf_stream
*insert_new_stream(
442 struct writer_component
*writer_component
,
443 struct fs_writer
*fs_writer
,
444 struct bt_ctf_stream_class
*stream_class
,
445 struct bt_ctf_stream
*stream
)
447 struct bt_ctf_stream
*writer_stream
= NULL
;
448 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
449 struct bt_ctf_writer
*ctf_writer
= bt_get(fs_writer
->writer
);
451 writer_stream_class
= lookup_stream_class(writer_component
,
453 if (!writer_stream_class
) {
454 writer_stream_class
= insert_new_stream_class(
455 writer_component
, fs_writer
, stream_class
);
456 if (!writer_stream_class
) {
457 BT_LOGE_STR("Failed to insert a new stream_class.");
461 bt_get(writer_stream_class
);
463 writer_stream
= bt_ctf_stream_create(writer_stream_class
,
464 bt_ctf_stream_get_name(stream
));
465 assert(writer_stream
);
467 g_hash_table_insert(fs_writer
->stream_map
, (gpointer
) stream
,
473 BT_PUT(writer_stream
);
476 bt_put(writer_stream_class
);
477 return writer_stream
;
481 struct bt_ctf_event_class
*get_event_class(struct writer_component
*writer_component
,
482 struct bt_ctf_stream_class
*writer_stream_class
,
483 struct bt_ctf_event_class
*event_class
)
485 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class
,
486 bt_ctf_event_class_get_id(event_class
));
490 struct bt_ctf_stream
*get_writer_stream(
491 struct writer_component
*writer_component
,
492 struct bt_ctf_packet
*packet
, struct bt_ctf_stream
*stream
)
494 struct bt_ctf_stream
*writer_stream
= NULL
;
496 writer_stream
= lookup_stream(writer_component
, stream
);
497 if (!writer_stream
) {
498 BT_LOGE_STR("Failed to find existing stream.");
501 bt_get(writer_stream
);
506 BT_PUT(writer_stream
);
508 return writer_stream
;
512 void writer_close(struct writer_component
*writer_component
,
513 struct fs_writer
*fs_writer
)
515 if (fs_writer
->static_listener_id
>= 0) {
516 bt_ctf_trace_remove_is_static_listener(fs_writer
->trace
,
517 fs_writer
->static_listener_id
);
520 /* Empty the stream class HT. */
521 g_hash_table_foreach_remove(fs_writer
->stream_class_map
,
523 g_hash_table_destroy(fs_writer
->stream_class_map
);
525 /* Empty the stream HT. */
526 g_hash_table_foreach_remove(fs_writer
->stream_map
,
527 empty_streams_ht
, NULL
);
528 g_hash_table_destroy(fs_writer
->stream_map
);
530 /* Empty the stream state HT. */
531 g_hash_table_foreach_remove(fs_writer
->stream_states
,
533 g_hash_table_destroy(fs_writer
->stream_states
);
537 enum bt_component_status
writer_stream_begin(
538 struct writer_component
*writer_component
,
539 struct bt_ctf_stream
*stream
)
541 struct bt_ctf_stream_class
*stream_class
= NULL
;
542 struct fs_writer
*fs_writer
;
543 struct bt_ctf_stream
*writer_stream
= NULL
;
544 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
545 enum fs_writer_stream_state
*state
;
547 stream_class
= bt_ctf_stream_get_class(stream
);
548 assert(stream_class
);
550 fs_writer
= get_fs_writer(writer_component
, stream_class
);
552 BT_LOGE_STR("Failed to get fs_writer.");
556 /* Set the stream as active */
557 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
559 if (fs_writer
->trace_static
) {
560 BT_LOGE_STR("Cannot add new stream on a static trace.");
563 state
= insert_new_stream_state(writer_component
, fs_writer
,
566 if (*state
!= FS_WRITER_UNKNOWN_STREAM
) {
567 BT_LOGE("Unexpected stream state: state=%d", *state
);
570 *state
= FS_WRITER_ACTIVE_STREAM
;
572 writer_stream
= insert_new_stream(writer_component
, fs_writer
,
573 stream_class
, stream
);
574 if (!writer_stream
) {
575 BT_LOGE_STR("Failed to insert new stream.");
582 ret
= BT_COMPONENT_STATUS_ERROR
;
584 bt_put(stream_class
);
589 enum bt_component_status
writer_stream_end(
590 struct writer_component
*writer_component
,
591 struct bt_ctf_stream
*stream
)
593 struct bt_ctf_stream_class
*stream_class
= NULL
;
594 struct fs_writer
*fs_writer
;
595 struct bt_ctf_trace
*trace
= NULL
;
596 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
597 enum fs_writer_stream_state
*state
;
599 stream_class
= bt_ctf_stream_get_class(stream
);
600 assert(stream_class
);
602 fs_writer
= get_fs_writer(writer_component
, stream_class
);
604 BT_LOGE_STR("Failed to get fs_writer.");
608 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
609 if (*state
!= FS_WRITER_ACTIVE_STREAM
) {
610 BT_LOGE("Unexpected stream state: state=%d", *state
);
613 *state
= FS_WRITER_COMPLETED_STREAM
;
615 g_hash_table_remove(fs_writer
->stream_map
, stream
);
617 if (fs_writer
->trace_static
) {
618 int trace_completed
= 1;
620 g_hash_table_foreach(fs_writer
->stream_states
,
621 check_completed_trace
, &trace_completed
);
622 if (trace_completed
) {
623 writer_close(writer_component
, fs_writer
);
624 g_hash_table_remove(writer_component
->trace_map
,
632 ret
= BT_COMPONENT_STATUS_ERROR
;
635 BT_PUT(stream_class
);
640 enum bt_component_status
writer_new_packet(
641 struct writer_component
*writer_component
,
642 struct bt_ctf_packet
*packet
)
644 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
645 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
648 stream
= bt_ctf_packet_get_stream(packet
);
651 writer_stream
= get_writer_stream(writer_component
, packet
, stream
);
652 if (!writer_stream
) {
653 BT_LOGE_STR("Failed to get writer_stream.");
658 int_ret
= ctf_stream_copy_packet_context(
659 writer_component
->err
, packet
, writer_stream
);
661 BT_LOGE_STR("Failed to copy packet_context.");
665 ret
= ctf_stream_copy_packet_header(writer_component
->err
,
666 packet
, writer_stream
);
668 BT_LOGE_STR("Failed to copy packet_header.");
675 ret
= BT_COMPONENT_STATUS_ERROR
;
677 bt_put(writer_stream
);
683 enum bt_component_status
writer_close_packet(
684 struct writer_component
*writer_component
,
685 struct bt_ctf_packet
*packet
)
687 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
688 enum bt_component_status ret
;
690 stream
= bt_ctf_packet_get_stream(packet
);
693 writer_stream
= lookup_stream(writer_component
, stream
);
694 if (!writer_stream
) {
695 BT_LOGE_STR("Failed to find existing stream.");
700 bt_get(writer_stream
);
702 ret
= bt_ctf_stream_flush(writer_stream
);
704 BT_LOGE_STR("Failed to flush stream.");
708 BT_PUT(writer_stream
);
710 ret
= BT_COMPONENT_STATUS_OK
;
714 ret
= BT_COMPONENT_STATUS_ERROR
;
716 bt_put(writer_stream
);
722 enum bt_component_status
writer_output_event(
723 struct writer_component
*writer_component
,
724 struct bt_ctf_event
*event
)
726 enum bt_component_status ret
;
727 struct bt_ctf_event_class
*event_class
= NULL
, *writer_event_class
= NULL
;
728 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
729 struct bt_ctf_stream_class
*stream_class
= NULL
, *writer_stream_class
= NULL
;
730 struct bt_ctf_event
*writer_event
= NULL
;
733 event_class
= bt_ctf_event_get_class(event
);
736 stream
= bt_ctf_event_get_stream(event
);
739 writer_stream
= lookup_stream(writer_component
, stream
);
740 if (!writer_stream
|| !bt_get(writer_stream
)) {
741 BT_LOGE_STR("Failed for find existing stream.");
745 stream_class
= bt_ctf_event_class_get_stream_class(event_class
);
746 assert(stream_class
);
748 writer_stream_class
= lookup_stream_class(writer_component
, stream_class
);
749 if (!writer_stream_class
|| !bt_get(writer_stream_class
)) {
750 BT_LOGE_STR("Failed to find existing stream_class.");
754 writer_event_class
= get_event_class(writer_component
,
755 writer_stream_class
, event_class
);
756 if (!writer_event_class
) {
757 writer_event_class
= ctf_copy_event_class(writer_component
->err
,
759 if (!writer_event_class
) {
760 BT_LOGE_STR("Failed to copy event_class.");
763 int_ret
= bt_ctf_stream_class_add_event_class(
764 writer_stream_class
, writer_event_class
);
766 BT_LOGE("Failed to add event_class: event_name=\"%s\"",
767 bt_ctf_event_class_get_name(event_class
));
772 writer_event
= ctf_copy_event(writer_component
->err
, event
,
773 writer_event_class
, true);
775 BT_LOGE("Failed to copy event: event_class=\"%s\"",
776 bt_ctf_event_class_get_name(writer_event_class
));
780 int_ret
= bt_ctf_stream_append_event(writer_stream
, writer_event
);
782 BT_LOGE("Failed to append event: event_class=\"%s\"",
783 bt_ctf_event_class_get_name(writer_event_class
));
787 ret
= BT_COMPONENT_STATUS_OK
;
791 ret
= BT_COMPONENT_STATUS_ERROR
;
793 bt_put(writer_event
);
794 bt_put(writer_event_class
);
795 bt_put(writer_stream_class
);
796 bt_put(stream_class
);
797 bt_put(writer_stream
);