4 * Babeltrace CTF Writer Output Plugin Event Handling
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #define BT_LOG_TAG "PLUGIN-CTF-FS-SINK-WRITE"
32 #include <babeltrace/ctf-ir/event.h>
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/ctf-ir/event-class.h>
35 #include <babeltrace/ctf-ir/stream.h>
36 #include <babeltrace/ctf-ir/stream-class.h>
37 #include <babeltrace/ctf-ir/clock-class.h>
38 #include <babeltrace/ctf-ir/fields.h>
39 #include <babeltrace/ctf-writer/stream-class.h>
40 #include <babeltrace/ctf-writer/stream.h>
44 #include <ctfcopytrace.h>
49 void unref_stream_class(struct bt_ctf_stream_class
*writer_stream_class
)
51 bt_put(writer_stream_class
);
55 void unref_stream(struct bt_ctf_stream_class
*writer_stream
)
57 bt_put(writer_stream
);
61 gboolean
empty_ht(gpointer key
, gpointer value
, gpointer user_data
)
67 gboolean
empty_streams_ht(gpointer key
, gpointer value
, gpointer user_data
)
70 struct bt_ctf_stream
*writer_stream
= value
;
72 ret
= bt_ctf_stream_flush(writer_stream
);
74 BT_LOGD_STR("Failed to flush stream while emptying hash table.");
80 void destroy_stream_state_key(gpointer key
)
82 g_free((enum fs_writer_stream_state
*) key
);
86 void check_completed_trace(gpointer key
, gpointer value
, gpointer user_data
)
88 enum fs_writer_stream_state
*state
= value
;
89 int *trace_completed
= user_data
;
91 if (*state
!= FS_WRITER_COMPLETED_STREAM
) {
97 void trace_is_static_listener(struct bt_ctf_trace
*trace
, void *data
)
99 struct fs_writer
*fs_writer
= data
;
100 int trace_completed
= 1;
102 fs_writer
->trace_static
= 1;
104 g_hash_table_foreach(fs_writer
->stream_states
,
105 check_completed_trace
, &trace_completed
);
106 if (trace_completed
) {
107 writer_close(fs_writer
->writer_component
, fs_writer
);
108 g_hash_table_remove(fs_writer
->writer_component
->trace_map
,
114 struct bt_ctf_stream_class
*insert_new_stream_class(
115 struct writer_component
*writer_component
,
116 struct fs_writer
*fs_writer
,
117 struct bt_ctf_stream_class
*stream_class
)
119 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
120 struct bt_ctf_trace
*trace
= NULL
, *writer_trace
= NULL
;
121 struct bt_ctf_writer
*ctf_writer
= fs_writer
->writer
;
122 enum bt_component_status ret
;
124 trace
= bt_ctf_stream_class_get_trace(stream_class
);
127 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
128 assert(writer_trace
);
130 ret
= ctf_copy_clock_classes(writer_component
->err
, writer_trace
,
131 writer_stream_class
, trace
);
132 if (ret
!= BT_COMPONENT_STATUS_OK
) {
133 BT_LOGE_STR("Failed to copy clock classes.");
137 writer_stream_class
= ctf_copy_stream_class(writer_component
->err
,
138 stream_class
, writer_trace
, true);
139 if (!writer_stream_class
) {
140 BT_LOGE_STR("Failed to copy stream class.");
144 ret
= bt_ctf_trace_add_stream_class(writer_trace
, writer_stream_class
);
146 BT_LOGE_STR("Failed to add stream_class.");
150 g_hash_table_insert(fs_writer
->stream_class_map
,
151 (gpointer
) stream_class
, writer_stream_class
);
156 BT_PUT(writer_stream_class
);
158 bt_put(writer_trace
);
160 return writer_stream_class
;
164 enum fs_writer_stream_state
*insert_new_stream_state(
165 struct writer_component
*writer_component
,
166 struct fs_writer
*fs_writer
, struct bt_ctf_stream
*stream
)
168 enum fs_writer_stream_state
*v
= NULL
;
170 v
= g_new0(enum fs_writer_stream_state
, 1);
172 BT_LOGE_STR("Failed to allocate fs_writer_stream_state.");
175 *v
= FS_WRITER_UNKNOWN_STREAM
;
177 g_hash_table_insert(fs_writer
->stream_states
, stream
, v
);
183 * Make sure the output path is valid for a single trace: either it does
184 * not exists or it is empty.
186 * Return 0 if the path is valid, -1 otherwise.
189 bool valid_single_trace_path(const char *path
)
191 GError
*error
= NULL
;
195 dir
= g_dir_open(path
, 0, &error
);
197 /* Non-existent directory. */
199 /* For any other error, return an error */
200 if (error
->code
!= G_FILE_ERROR_NOENT
) {
206 /* g_dir_read_name skips "." and "..", error out on first result */
207 while (g_dir_read_name(dir
) != NULL
) {
224 int make_trace_path(struct writer_component
*writer_component
,
225 struct bt_ctf_trace
*trace
, char *trace_path
)
228 const char *trace_name
;
230 if (writer_component
->single_trace
) {
233 trace_name
= bt_ctf_trace_get_name(trace
);
235 trace_name
= writer_component
->trace_name_base
->str
;
239 /* Sanitize the trace name. */
240 if (strlen(trace_name
) == 2 && !strcmp(trace_name
, "..")) {
241 BT_LOGE_STR("Trace name cannot be \"..\".");
245 if (strstr(trace_name
, "../")) {
246 BT_LOGE_STR("Trace name cannot contain \"../\".");
251 snprintf(trace_path
, PATH_MAX
, "%s/%s",
252 writer_component
->base_path
->str
,
255 * Append a suffix if the trace_path exists and we are not in
258 if (writer_component
->single_trace
) {
259 if (valid_single_trace_path(trace_path
) != 0) {
260 BT_LOGE_STR("Invalid output directory.");
264 if (g_file_test(trace_path
, G_FILE_TEST_EXISTS
)) {
268 snprintf(trace_path
, PATH_MAX
, "%s/%s-%d",
269 writer_component
->base_path
->str
,
271 } while (g_file_test(trace_path
, G_FILE_TEST_EXISTS
) && i
< INT_MAX
);
273 BT_LOGE_STR("Unable to find a unique trace path.");
289 struct fs_writer
*insert_new_writer(
290 struct writer_component
*writer_component
,
291 struct bt_ctf_trace
*trace
)
293 struct bt_ctf_writer
*ctf_writer
= NULL
;
294 struct bt_ctf_trace
*writer_trace
= NULL
;
295 char trace_path
[PATH_MAX
];
296 enum bt_component_status ret
;
297 struct bt_ctf_stream
*stream
= NULL
;
298 struct fs_writer
*fs_writer
= NULL
;
301 if (writer_component
->single_trace
&& writer_component
->nr_traces
> 0) {
302 BT_LOGE_STR("Trying to process more than one trace but single trace mode enabled.");
306 ret
= make_trace_path(writer_component
, trace
, trace_path
);
308 BT_LOGE_STR("Failed to make trace path.");
312 printf("ctf.fs sink creating trace in %s\n", trace_path
);
314 ctf_writer
= bt_ctf_writer_create(trace_path
);
316 BT_LOGE_STR("Failed to create CTF writer.");
320 writer_trace
= bt_ctf_writer_get_trace(ctf_writer
);
321 assert(writer_trace
);
323 ret
= ctf_copy_trace(writer_component
->err
, trace
, writer_trace
);
324 if (ret
!= BT_COMPONENT_STATUS_OK
) {
325 BT_LOGE_STR("Failed to copy trace.");
330 fs_writer
= g_new0(struct fs_writer
, 1);
332 BT_LOGE_STR("Failed to allocate fs_writer.");
335 fs_writer
->writer
= ctf_writer
;
336 fs_writer
->trace
= trace
;
337 fs_writer
->writer_trace
= writer_trace
;
338 fs_writer
->writer_component
= writer_component
;
339 BT_PUT(writer_trace
);
340 fs_writer
->stream_class_map
= g_hash_table_new_full(g_direct_hash
,
341 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream_class
);
342 fs_writer
->stream_map
= g_hash_table_new_full(g_direct_hash
,
343 g_direct_equal
, NULL
, (GDestroyNotify
) unref_stream
);
344 fs_writer
->stream_states
= g_hash_table_new_full(g_direct_hash
,
345 g_direct_equal
, NULL
, destroy_stream_state_key
);
347 /* Set all the existing streams in the unknown state. */
348 nr_stream
= bt_ctf_trace_get_stream_count(trace
);
349 for (i
= 0; i
< nr_stream
; i
++) {
350 stream
= bt_ctf_trace_get_stream_by_index(trace
, i
);
353 insert_new_stream_state(writer_component
, fs_writer
, stream
);
357 /* Check if the trace is already static or register a listener. */
358 if (bt_ctf_trace_is_static(trace
)) {
359 fs_writer
->trace_static
= 1;
360 fs_writer
->static_listener_id
= -1;
362 ret
= bt_ctf_trace_add_is_static_listener(trace
,
363 trace_is_static_listener
, fs_writer
);
365 fs_writer
->static_listener_id
= ret
;
368 writer_component
->nr_traces
++;
369 g_hash_table_insert(writer_component
->trace_map
, (gpointer
) trace
,
377 bt_put(writer_trace
);
385 struct fs_writer
*get_fs_writer(struct writer_component
*writer_component
,
386 struct bt_ctf_stream_class
*stream_class
)
388 struct bt_ctf_trace
*trace
= NULL
;
389 struct fs_writer
*fs_writer
;
391 trace
= bt_ctf_stream_class_get_trace(stream_class
);
394 fs_writer
= g_hash_table_lookup(writer_component
->trace_map
,
397 fs_writer
= insert_new_writer(writer_component
, trace
);
405 struct fs_writer
*get_fs_writer_from_stream(
406 struct writer_component
*writer_component
,
407 struct bt_ctf_stream
*stream
)
409 struct bt_ctf_stream_class
*stream_class
= NULL
;
410 struct fs_writer
*fs_writer
;
412 stream_class
= bt_ctf_stream_get_class(stream
);
413 assert(stream_class
);
415 fs_writer
= get_fs_writer(writer_component
, stream_class
);
417 bt_put(stream_class
);
422 struct bt_ctf_stream_class
*lookup_stream_class(
423 struct writer_component
*writer_component
,
424 struct bt_ctf_stream_class
*stream_class
)
426 struct fs_writer
*fs_writer
= get_fs_writer(
427 writer_component
, stream_class
);
429 return (struct bt_ctf_stream_class
*) g_hash_table_lookup(
430 fs_writer
->stream_class_map
, (gpointer
) stream_class
);
434 struct bt_ctf_stream
*lookup_stream(struct writer_component
*writer_component
,
435 struct bt_ctf_stream
*stream
)
437 struct fs_writer
*fs_writer
= get_fs_writer_from_stream(
438 writer_component
, stream
);
440 return (struct bt_ctf_stream
*) g_hash_table_lookup(
441 fs_writer
->stream_map
, (gpointer
) stream
);
445 struct bt_ctf_stream
*insert_new_stream(
446 struct writer_component
*writer_component
,
447 struct fs_writer
*fs_writer
,
448 struct bt_ctf_stream_class
*stream_class
,
449 struct bt_ctf_stream
*stream
)
451 struct bt_ctf_stream
*writer_stream
= NULL
;
452 struct bt_ctf_stream_class
*writer_stream_class
= NULL
;
453 struct bt_ctf_writer
*ctf_writer
= bt_get(fs_writer
->writer
);
455 writer_stream_class
= lookup_stream_class(writer_component
,
457 if (!writer_stream_class
) {
458 writer_stream_class
= insert_new_stream_class(
459 writer_component
, fs_writer
, stream_class
);
460 if (!writer_stream_class
) {
461 BT_LOGE_STR("Failed to insert a new stream_class.");
465 bt_get(writer_stream_class
);
467 writer_stream
= bt_ctf_stream_create(writer_stream_class
,
468 bt_ctf_stream_get_name(stream
));
469 assert(writer_stream
);
471 g_hash_table_insert(fs_writer
->stream_map
, (gpointer
) stream
,
477 BT_PUT(writer_stream
);
480 bt_put(writer_stream_class
);
481 return writer_stream
;
485 struct bt_ctf_event_class
*get_event_class(struct writer_component
*writer_component
,
486 struct bt_ctf_stream_class
*writer_stream_class
,
487 struct bt_ctf_event_class
*event_class
)
489 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class
,
490 bt_ctf_event_class_get_id(event_class
));
494 struct bt_ctf_stream
*get_writer_stream(
495 struct writer_component
*writer_component
,
496 struct bt_ctf_packet
*packet
, struct bt_ctf_stream
*stream
)
498 struct bt_ctf_stream
*writer_stream
= NULL
;
500 writer_stream
= lookup_stream(writer_component
, stream
);
501 if (!writer_stream
) {
502 BT_LOGE_STR("Failed to find existing stream.");
505 bt_get(writer_stream
);
510 BT_PUT(writer_stream
);
512 return writer_stream
;
516 void writer_close(struct writer_component
*writer_component
,
517 struct fs_writer
*fs_writer
)
519 if (fs_writer
->static_listener_id
>= 0) {
520 bt_ctf_trace_remove_is_static_listener(fs_writer
->trace
,
521 fs_writer
->static_listener_id
);
524 /* Empty the stream class HT. */
525 g_hash_table_foreach_remove(fs_writer
->stream_class_map
,
527 g_hash_table_destroy(fs_writer
->stream_class_map
);
529 /* Empty the stream HT. */
530 g_hash_table_foreach_remove(fs_writer
->stream_map
,
531 empty_streams_ht
, NULL
);
532 g_hash_table_destroy(fs_writer
->stream_map
);
534 /* Empty the stream state HT. */
535 g_hash_table_foreach_remove(fs_writer
->stream_states
,
537 g_hash_table_destroy(fs_writer
->stream_states
);
541 enum bt_component_status
writer_stream_begin(
542 struct writer_component
*writer_component
,
543 struct bt_ctf_stream
*stream
)
545 struct bt_ctf_stream_class
*stream_class
= NULL
;
546 struct fs_writer
*fs_writer
;
547 struct bt_ctf_stream
*writer_stream
= NULL
;
548 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
549 enum fs_writer_stream_state
*state
;
551 stream_class
= bt_ctf_stream_get_class(stream
);
552 assert(stream_class
);
554 fs_writer
= get_fs_writer(writer_component
, stream_class
);
556 BT_LOGE_STR("Failed to get fs_writer.");
560 /* Set the stream as active */
561 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
563 if (fs_writer
->trace_static
) {
564 BT_LOGE_STR("Cannot add new stream on a static trace.");
567 state
= insert_new_stream_state(writer_component
, fs_writer
,
570 if (*state
!= FS_WRITER_UNKNOWN_STREAM
) {
571 BT_LOGE("Unexpected stream state: state=%d", *state
);
574 *state
= FS_WRITER_ACTIVE_STREAM
;
576 writer_stream
= insert_new_stream(writer_component
, fs_writer
,
577 stream_class
, stream
);
578 if (!writer_stream
) {
579 BT_LOGE_STR("Failed to insert new stream.");
586 ret
= BT_COMPONENT_STATUS_ERROR
;
588 bt_put(stream_class
);
593 enum bt_component_status
writer_stream_end(
594 struct writer_component
*writer_component
,
595 struct bt_ctf_stream
*stream
)
597 struct bt_ctf_stream_class
*stream_class
= NULL
;
598 struct fs_writer
*fs_writer
;
599 struct bt_ctf_trace
*trace
= NULL
;
600 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
601 enum fs_writer_stream_state
*state
;
603 stream_class
= bt_ctf_stream_get_class(stream
);
604 assert(stream_class
);
606 fs_writer
= get_fs_writer(writer_component
, stream_class
);
608 BT_LOGE_STR("Failed to get fs_writer.");
612 state
= g_hash_table_lookup(fs_writer
->stream_states
, stream
);
613 if (*state
!= FS_WRITER_ACTIVE_STREAM
) {
614 BT_LOGE("Unexpected stream state: state=%d", *state
);
617 *state
= FS_WRITER_COMPLETED_STREAM
;
619 g_hash_table_remove(fs_writer
->stream_map
, stream
);
621 if (fs_writer
->trace_static
) {
622 int trace_completed
= 1;
624 g_hash_table_foreach(fs_writer
->stream_states
,
625 check_completed_trace
, &trace_completed
);
626 if (trace_completed
) {
627 writer_close(writer_component
, fs_writer
);
628 g_hash_table_remove(writer_component
->trace_map
,
636 ret
= BT_COMPONENT_STATUS_ERROR
;
639 BT_PUT(stream_class
);
644 enum bt_component_status
writer_new_packet(
645 struct writer_component
*writer_component
,
646 struct bt_ctf_packet
*packet
)
648 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
649 enum bt_component_status ret
= BT_COMPONENT_STATUS_OK
;
652 stream
= bt_ctf_packet_get_stream(packet
);
655 writer_stream
= get_writer_stream(writer_component
, packet
, stream
);
656 if (!writer_stream
) {
657 BT_LOGE_STR("Failed to get writer_stream.");
662 int_ret
= ctf_stream_copy_packet_context(
663 writer_component
->err
, packet
, writer_stream
);
665 BT_LOGE_STR("Failed to copy packet_context.");
669 ret
= ctf_stream_copy_packet_header(writer_component
->err
,
670 packet
, writer_stream
);
672 BT_LOGE_STR("Failed to copy packet_header.");
679 ret
= BT_COMPONENT_STATUS_ERROR
;
681 bt_put(writer_stream
);
687 enum bt_component_status
writer_close_packet(
688 struct writer_component
*writer_component
,
689 struct bt_ctf_packet
*packet
)
691 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
692 enum bt_component_status ret
;
694 stream
= bt_ctf_packet_get_stream(packet
);
697 writer_stream
= lookup_stream(writer_component
, stream
);
698 if (!writer_stream
) {
699 BT_LOGE_STR("Failed to find existing stream.");
704 bt_get(writer_stream
);
706 ret
= bt_ctf_stream_flush(writer_stream
);
708 BT_LOGE_STR("Failed to flush stream.");
712 BT_PUT(writer_stream
);
714 ret
= BT_COMPONENT_STATUS_OK
;
718 ret
= BT_COMPONENT_STATUS_ERROR
;
720 bt_put(writer_stream
);
726 enum bt_component_status
writer_output_event(
727 struct writer_component
*writer_component
,
728 struct bt_ctf_event
*event
)
730 enum bt_component_status ret
;
731 struct bt_ctf_event_class
*event_class
= NULL
, *writer_event_class
= NULL
;
732 struct bt_ctf_stream
*stream
= NULL
, *writer_stream
= NULL
;
733 struct bt_ctf_stream_class
*stream_class
= NULL
, *writer_stream_class
= NULL
;
734 struct bt_ctf_event
*writer_event
= NULL
;
737 event_class
= bt_ctf_event_get_class(event
);
740 stream
= bt_ctf_event_get_stream(event
);
743 writer_stream
= lookup_stream(writer_component
, stream
);
744 if (!writer_stream
|| !bt_get(writer_stream
)) {
745 BT_LOGE_STR("Failed for find existing stream.");
749 stream_class
= bt_ctf_event_class_get_stream_class(event_class
);
750 assert(stream_class
);
752 writer_stream_class
= lookup_stream_class(writer_component
, stream_class
);
753 if (!writer_stream_class
|| !bt_get(writer_stream_class
)) {
754 BT_LOGE_STR("Failed to find existing stream_class.");
758 writer_event_class
= get_event_class(writer_component
,
759 writer_stream_class
, event_class
);
760 if (!writer_event_class
) {
761 writer_event_class
= ctf_copy_event_class(writer_component
->err
,
763 if (!writer_event_class
) {
764 BT_LOGE_STR("Failed to copy event_class.");
767 int_ret
= bt_ctf_stream_class_add_event_class(
768 writer_stream_class
, writer_event_class
);
770 BT_LOGE("Failed to add event_class: event_name=\"%s\"",
771 bt_ctf_event_class_get_name(event_class
));
776 writer_event
= ctf_copy_event(writer_component
->err
, event
,
777 writer_event_class
, true);
779 BT_LOGE("Failed to copy event: event_class=\"%s\"",
780 bt_ctf_event_class_get_name(writer_event_class
));
784 int_ret
= bt_ctf_stream_append_event(writer_stream
, writer_event
);
786 BT_LOGE("Failed to append event: event_class=\"%s\"",
787 bt_ctf_event_class_get_name(writer_event_class
));
791 ret
= BT_COMPONENT_STATUS_OK
;
795 ret
= BT_COMPONENT_STATUS_ERROR
;
797 bt_put(writer_event
);
798 bt_put(writer_event_class
);
799 bt_put(writer_stream_class
);
800 bt_put(stream_class
);
801 bt_put(writer_stream
);