fs-sink: move stream and stream_class maps in fs_writer struct
[babeltrace.git] / plugins / ctf / fs-sink / write.c
1 /*
2 * writer.c
3 *
4 * Babeltrace CTF Writer Output Plugin Event Handling
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/ctf-ir/event.h>
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/ctf-ir/event-class.h>
32 #include <babeltrace/ctf-ir/stream.h>
33 #include <babeltrace/ctf-ir/stream-class.h>
34 #include <babeltrace/ctf-ir/clock-class.h>
35 #include <babeltrace/ctf-ir/fields.h>
36 #include <babeltrace/ctf-writer/stream-class.h>
37 #include <babeltrace/ctf-writer/stream.h>
38 #include <assert.h>
39
40 #include <ctfcopytrace.h>
41
42 #include "writer.h"
43
44 static
45 void unref_stream_class(struct bt_ctf_stream_class *writer_stream_class)
46 {
47 bt_put(writer_stream_class);
48 }
49
50 static
51 void unref_stream(struct bt_ctf_stream_class *writer_stream)
52 {
53 bt_put(writer_stream);
54 }
55
56 gboolean empty_ht(gpointer key, gpointer value, gpointer user_data)
57 {
58 return TRUE;
59 }
60
61 static
62 void trace_is_static_listener(struct bt_ctf_trace *trace, void *data)
63 {
64 *((int *) data) = 1;
65 }
66
67 static
68 struct bt_ctf_stream_class *insert_new_stream_class(
69 struct writer_component *writer_component,
70 struct fs_writer *fs_writer,
71 struct bt_ctf_stream_class *stream_class)
72 {
73 struct bt_ctf_stream_class *writer_stream_class = NULL;
74 struct bt_ctf_trace *trace = NULL, *writer_trace = NULL;
75 struct bt_ctf_writer *ctf_writer = fs_writer->writer;
76 enum bt_component_status ret;
77
78 trace = bt_ctf_stream_class_get_trace(stream_class);
79 if (!trace) {
80 fprintf(writer_component->err,
81 "[error] %s in %s:%d\n", __func__, __FILE__,
82 __LINE__);
83 goto error;
84 }
85
86 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
87 if (!writer_trace) {
88 fprintf(writer_component->err,
89 "[error] %s in %s:%d\n", __func__, __FILE__,
90 __LINE__);
91 goto error;
92 }
93
94 ret = ctf_copy_clock_classes(writer_component->err, writer_trace,
95 writer_stream_class, trace);
96 if (ret != BT_COMPONENT_STATUS_OK) {
97 fprintf(writer_component->err,
98 "[error] %s in %s:%d\n", __func__, __FILE__,
99 __LINE__);
100 goto error;
101 }
102
103 writer_stream_class = ctf_copy_stream_class(writer_component->err,
104 stream_class, writer_trace, true);
105 if (!writer_stream_class) {
106 fprintf(writer_component->err, "[error] Failed to copy stream class\n");
107 fprintf(writer_component->err, "[error] %s in %s:%d\n",
108 __func__, __FILE__, __LINE__);
109 goto error;
110 }
111
112 g_hash_table_insert(fs_writer->stream_class_map,
113 (gpointer) stream_class, writer_stream_class);
114
115 goto end;
116
117 error:
118 BT_PUT(writer_stream_class);
119 end:
120 bt_put(writer_trace);
121 bt_put(trace);
122 return writer_stream_class;
123 }
124
125 static
126 struct fs_writer *insert_new_writer(
127 struct writer_component *writer_component,
128 struct bt_ctf_trace *trace)
129 {
130 struct bt_ctf_writer *ctf_writer = NULL;
131 struct bt_ctf_trace *writer_trace = NULL;
132 char trace_name[PATH_MAX];
133 enum bt_component_status ret;
134 struct fs_writer *fs_writer;
135
136 /* FIXME: replace with trace name when it will work. */
137 snprintf(trace_name, PATH_MAX, "%s/%s_%03d",
138 writer_component->base_path->str,
139 writer_component->trace_name_base->str,
140 writer_component->trace_id++);
141 printf_verbose("CTF-Writer creating trace in %s\n", trace_name);
142
143 ctf_writer = bt_ctf_writer_create(trace_name);
144 if (!ctf_writer) {
145 fprintf(writer_component->err, "[error] %s in %s:%d\n",
146 __func__, __FILE__, __LINE__);
147 goto error;
148 }
149
150 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
151 if (!writer_trace) {
152 fprintf(writer_component->err,
153 "[error] %s in %s:%d\n", __func__, __FILE__,
154 __LINE__);
155 goto error;
156 }
157
158 ret = ctf_copy_trace(writer_component->err, trace, writer_trace);
159 if (ret != BT_COMPONENT_STATUS_OK) {
160 fprintf(writer_component->err, "[error] Failed to copy trace\n");
161 fprintf(writer_component->err, "[error] %s in %s:%d\n",
162 __func__, __FILE__, __LINE__);
163 BT_PUT(ctf_writer);
164 goto error;
165 }
166
167 fs_writer = g_new0(struct fs_writer, 1);
168 if (!fs_writer) {
169 fprintf(writer_component->err,
170 "[error] %s in %s:%d\n", __func__, __FILE__,
171 __LINE__);
172 goto error;
173 }
174 fs_writer->writer = ctf_writer;
175 fs_writer->stream_class_map = g_hash_table_new_full(g_direct_hash,
176 g_direct_equal, NULL, (GDestroyNotify) unref_stream_class);
177 fs_writer->stream_map = g_hash_table_new_full(g_direct_hash,
178 g_direct_equal, NULL, (GDestroyNotify) unref_stream);
179 fs_writer->trace = trace;
180 fs_writer->writer_trace = writer_trace;
181 BT_PUT(writer_trace);
182 if (bt_ctf_trace_is_static(trace)) {
183 fs_writer->trace_static = 1;
184 fs_writer->static_listener_id = -1;
185 } else {
186 ret = bt_ctf_trace_add_is_static_listener(trace,
187 trace_is_static_listener, &fs_writer->trace_static);
188 if (ret < 0) {
189 fprintf(writer_component->err,
190 "[error] %s in %s:%d\n", __func__, __FILE__,
191 __LINE__);
192 g_free(fs_writer);
193 fs_writer = NULL;
194 goto error;
195 }
196 fs_writer->static_listener_id = ret;
197 }
198
199 g_hash_table_insert(writer_component->trace_map, (gpointer) trace,
200 fs_writer);
201
202 goto end;
203
204 error:
205 bt_put(writer_trace);
206 BT_PUT(ctf_writer);
207 end:
208 return fs_writer;
209 }
210
211 static
212 struct fs_writer *get_fs_writer(struct writer_component *writer_component,
213 struct bt_ctf_stream_class *stream_class)
214 {
215 struct bt_ctf_trace *trace = NULL;
216 struct fs_writer *fs_writer;
217
218 trace = bt_ctf_stream_class_get_trace(stream_class);
219 if (!trace) {
220 fprintf(writer_component->err, "[error] %s in %s:%d\n",
221 __func__, __FILE__, __LINE__);
222 goto error;
223 }
224
225 fs_writer = g_hash_table_lookup(writer_component->trace_map,
226 (gpointer) trace);
227 if (!fs_writer) {
228 fs_writer = insert_new_writer(writer_component, trace);
229 }
230 BT_PUT(trace);
231 goto end;
232
233 error:
234 fs_writer = NULL;
235 end:
236 return fs_writer;
237 }
238
239 static
240 struct fs_writer *get_fs_writer_from_stream(
241 struct writer_component *writer_component,
242 struct bt_ctf_stream *stream)
243 {
244 struct bt_ctf_stream_class *stream_class = NULL;
245 struct fs_writer *fs_writer;
246
247 stream_class = bt_ctf_stream_get_class(stream);
248 if (!stream_class) {
249 fprintf(writer_component->err, "[error] %s in %s:%d\n",
250 __func__, __FILE__, __LINE__);
251 goto error;
252 }
253
254 fs_writer = get_fs_writer(writer_component, stream_class);
255 goto end;
256
257 error:
258 fs_writer = NULL;
259
260 end:
261 bt_put(stream_class);
262 return fs_writer;
263 }
264
265 static
266 struct bt_ctf_stream_class *lookup_stream_class(
267 struct writer_component *writer_component,
268 struct bt_ctf_stream_class *stream_class)
269 {
270 struct fs_writer *fs_writer = get_fs_writer(
271 writer_component, stream_class);
272 assert(fs_writer);
273 return (struct bt_ctf_stream_class *) g_hash_table_lookup(
274 fs_writer->stream_class_map, (gpointer) stream_class);
275 }
276
277 static
278 struct bt_ctf_stream *lookup_stream(struct writer_component *writer_component,
279 struct bt_ctf_stream *stream)
280 {
281 struct fs_writer *fs_writer = get_fs_writer_from_stream(
282 writer_component, stream);
283 assert(fs_writer);
284 return (struct bt_ctf_stream *) g_hash_table_lookup(
285 fs_writer->stream_map, (gpointer) stream);
286 }
287
288 static
289 struct bt_ctf_stream *insert_new_stream(
290 struct writer_component *writer_component,
291 struct fs_writer *fs_writer,
292 struct bt_ctf_stream_class *stream_class,
293 struct bt_ctf_stream *stream)
294 {
295 struct bt_ctf_stream *writer_stream = NULL;
296 struct bt_ctf_stream_class *writer_stream_class = NULL;
297 struct bt_ctf_writer *ctf_writer = bt_get(fs_writer->writer);
298
299 writer_stream_class = lookup_stream_class(writer_component,
300 stream_class);
301 if (!writer_stream_class) {
302 writer_stream_class = insert_new_stream_class(
303 writer_component, fs_writer, stream_class);
304 if (!writer_stream_class) {
305 fprintf(writer_component->err, "[error] %s in %s:%d\n",
306 __func__, __FILE__, __LINE__);
307 goto error;
308 }
309 }
310 bt_get(writer_stream_class);
311
312 writer_stream = bt_ctf_writer_create_stream(ctf_writer,
313 writer_stream_class);
314 if (!writer_stream) {
315 fprintf(writer_component->err, "[error] %s in %s:%d\n",
316 __func__, __FILE__, __LINE__);
317 goto error;
318 }
319
320 g_hash_table_insert(fs_writer->stream_map, (gpointer) stream,
321 writer_stream);
322
323 goto end;
324
325 error:
326 BT_PUT(writer_stream);
327 end:
328 bt_put(ctf_writer);
329 bt_put(writer_stream_class);
330 return writer_stream;
331 }
332
333 static
334 struct bt_ctf_event_class *get_event_class(struct writer_component *writer_component,
335 struct bt_ctf_stream_class *writer_stream_class,
336 struct bt_ctf_event_class *event_class)
337 {
338 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class,
339 bt_ctf_event_class_get_id(event_class));
340 }
341
342 static
343 struct bt_ctf_stream *get_writer_stream(
344 struct writer_component *writer_component,
345 struct bt_ctf_packet *packet, struct bt_ctf_stream *stream)
346 {
347 struct bt_ctf_stream *writer_stream = NULL;
348
349 writer_stream = lookup_stream(writer_component, stream);
350 if (!writer_stream) {
351 fprintf(writer_component->err, "[error] %s in %s:%d\n",
352 __func__, __FILE__, __LINE__);
353 goto error;
354 }
355 bt_get(writer_stream);
356
357 goto end;
358
359 error:
360 BT_PUT(writer_stream);
361 end:
362 return writer_stream;
363 }
364
365 BT_HIDDEN
366 void writer_close(struct writer_component *writer_component,
367 struct fs_writer *fs_writer)
368 {
369 if (fs_writer->static_listener_id > 0) {
370 bt_ctf_trace_remove_is_static_listener(fs_writer->trace,
371 fs_writer->static_listener_id);
372 }
373
374 /* Empty the stream class HT. */
375 g_hash_table_foreach_remove(fs_writer->stream_class_map,
376 empty_ht, NULL);
377 g_hash_table_destroy(fs_writer->stream_class_map);
378
379 /* Empty the stream HT. */
380 g_hash_table_foreach_remove(fs_writer->stream_map,
381 empty_ht, NULL);
382 g_hash_table_destroy(fs_writer->stream_map);
383 }
384
385 BT_HIDDEN
386 enum bt_component_status writer_stream_begin(
387 struct writer_component *writer_component,
388 struct bt_ctf_stream *stream)
389 {
390 struct bt_ctf_stream_class *stream_class = NULL;
391 struct fs_writer *fs_writer;
392 struct bt_ctf_stream *writer_stream = NULL;
393 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
394
395 stream_class = bt_ctf_stream_get_class(stream);
396 if (!stream_class) {
397 fprintf(writer_component->err, "[error] %s in %s:%d\n",
398 __func__, __FILE__, __LINE__);
399 goto error;
400 }
401
402 fs_writer = get_fs_writer(writer_component, stream_class);
403 if (!fs_writer) {
404 fprintf(writer_component->err, "[error] %s in %s:%d\n",
405 __func__, __FILE__, __LINE__);
406 goto error;
407 }
408 writer_stream = insert_new_stream(writer_component, fs_writer,
409 stream_class, stream);
410 if (!writer_stream) {
411 fprintf(writer_component->err, "[error] %s in %s:%d\n",
412 __func__, __FILE__, __LINE__);
413 goto error;
414 }
415 fs_writer->active_streams++;
416
417 goto end;
418
419 error:
420 ret = BT_COMPONENT_STATUS_ERROR;
421 end:
422 bt_put(stream_class);
423 return ret;
424 }
425
426 BT_HIDDEN
427 enum bt_component_status writer_stream_end(
428 struct writer_component *writer_component,
429 struct bt_ctf_stream *stream)
430 {
431 struct bt_ctf_stream_class *stream_class = NULL;
432 struct fs_writer *fs_writer;
433 struct bt_ctf_trace *trace = NULL;
434 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
435
436 stream_class = bt_ctf_stream_get_class(stream);
437 if (!stream_class) {
438 fprintf(writer_component->err, "[error] %s in %s:%d\n",
439 __func__, __FILE__, __LINE__);
440 goto error;
441 }
442
443 fs_writer = get_fs_writer(writer_component, stream_class);
444 if (!fs_writer) {
445 fprintf(writer_component->err, "[error] %s in %s:%d\n",
446 __func__, __FILE__, __LINE__);
447 goto error;
448 }
449 g_hash_table_remove(fs_writer->stream_map, stream);
450
451 assert(fs_writer->active_streams > 0);
452 fs_writer->active_streams--;
453 if (fs_writer->active_streams == 0 && fs_writer->trace_static) {
454 writer_close(writer_component, fs_writer);
455 g_hash_table_remove(writer_component->trace_map,
456 fs_writer->trace);
457 }
458
459 goto end;
460
461 error:
462 ret = BT_COMPONENT_STATUS_ERROR;
463 end:
464 BT_PUT(trace);
465 BT_PUT(stream_class);
466 return ret;
467 }
468
469 BT_HIDDEN
470 enum bt_component_status writer_new_packet(
471 struct writer_component *writer_component,
472 struct bt_ctf_packet *packet)
473 {
474 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
475 struct bt_ctf_field *writer_packet_context = NULL;
476 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
477 int int_ret;
478
479 stream = bt_ctf_packet_get_stream(packet);
480 if (!stream) {
481 fprintf(writer_component->err, "[error] %s in %s:%d\n",
482 __func__, __FILE__, __LINE__);
483 goto error;
484 }
485
486 writer_stream = get_writer_stream(writer_component, packet, stream);
487 if (!writer_stream) {
488 fprintf(writer_component->err, "[error] %s in %s:%d\n",
489 __func__, __FILE__, __LINE__);
490 goto error;
491 }
492 BT_PUT(stream);
493
494 writer_packet_context = ctf_copy_packet_context(writer_component->err,
495 packet, writer_stream, 1);
496 if (!writer_packet_context) {
497 fprintf(writer_component->err, "[error] %s in %s:%d\n",
498 __func__, __FILE__, __LINE__);
499 goto error;
500 }
501
502 int_ret = bt_ctf_stream_set_packet_context(writer_stream,
503 writer_packet_context);
504 if (int_ret < 0) {
505 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
506 __FILE__, __LINE__);
507 goto error;
508 }
509 BT_PUT(writer_stream);
510 BT_PUT(writer_packet_context);
511
512 goto end;
513
514 error:
515 ret = BT_COMPONENT_STATUS_ERROR;
516 end:
517 bt_put(writer_stream);
518 bt_put(writer_packet_context);
519 bt_put(stream);
520 return ret;
521 }
522
523 BT_HIDDEN
524 enum bt_component_status writer_close_packet(
525 struct writer_component *writer_component,
526 struct bt_ctf_packet *packet)
527 {
528 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
529 enum bt_component_status ret;
530
531 stream = bt_ctf_packet_get_stream(packet);
532 if (!stream) {
533 fprintf(writer_component->err, "[error] %s in %s:%d\n",
534 __func__, __FILE__, __LINE__);
535 goto error;
536 }
537
538 writer_stream = lookup_stream(writer_component, stream);
539 if (!writer_stream) {
540 fprintf(writer_component->err, "[error] %s in %s:%d\n",
541 __func__, __FILE__, __LINE__);
542 goto error;
543 }
544 BT_PUT(stream);
545
546 bt_get(writer_stream);
547
548 ret = bt_ctf_stream_flush(writer_stream);
549 if (ret < 0) {
550 fprintf(writer_component->err,
551 "[error] Failed to flush packet\n");
552 goto error;
553 }
554 BT_PUT(writer_stream);
555
556 ret = BT_COMPONENT_STATUS_OK;
557 goto end;
558
559 error:
560 ret = BT_COMPONENT_STATUS_ERROR;
561 end:
562 bt_put(writer_stream);
563 bt_put(stream);
564 return ret;
565 }
566
567 BT_HIDDEN
568 enum bt_component_status writer_output_event(
569 struct writer_component *writer_component,
570 struct bt_ctf_event *event)
571 {
572 enum bt_component_status ret;
573 struct bt_ctf_event_class *event_class = NULL, *writer_event_class = NULL;
574 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
575 struct bt_ctf_stream_class *stream_class = NULL, *writer_stream_class = NULL;
576 struct bt_ctf_event *writer_event = NULL;
577 const char *event_name;
578 int int_ret;
579
580 event_class = bt_ctf_event_get_class(event);
581 if (!event_class) {
582 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
583 __FILE__, __LINE__);
584 goto error;
585 }
586
587 event_name = bt_ctf_event_class_get_name(event_class);
588 if (!event_name) {
589 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
590 __FILE__, __LINE__);
591 goto error;
592 }
593
594 stream = bt_ctf_event_get_stream(event);
595 if (!stream) {
596 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
597 __FILE__, __LINE__);
598 goto error;
599 }
600
601 writer_stream = lookup_stream(writer_component, stream);
602 if (!writer_stream || !bt_get(writer_stream)) {
603 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
604 __FILE__, __LINE__);
605 goto error;
606 }
607
608 stream_class = bt_ctf_event_class_get_stream_class(event_class);
609 if (!stream_class) {
610 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
611 __FILE__, __LINE__);
612 goto error;
613 }
614
615 writer_stream_class = lookup_stream_class(writer_component, stream_class);
616 if (!writer_stream_class || !bt_get(writer_stream_class)) {
617 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
618 __FILE__, __LINE__);
619 goto error;
620 }
621
622 writer_event_class = get_event_class(writer_component,
623 writer_stream_class, event_class);
624 if (!writer_event_class) {
625 writer_event_class = ctf_copy_event_class(writer_component->err,
626 event_class);
627 if (!writer_event_class) {
628 fprintf(writer_component->err, "[error] %s in %s:%d\n",
629 __func__, __FILE__, __LINE__);
630 goto error;
631 }
632 int_ret = bt_ctf_stream_class_add_event_class(
633 writer_stream_class, writer_event_class);
634 if (int_ret) {
635 fprintf(writer_component->err, "[error] %s in %s:%d\n",
636 __func__, __FILE__, __LINE__);
637 goto error;
638 }
639 }
640
641 writer_event = ctf_copy_event(writer_component->err, event,
642 writer_event_class, true);
643 if (!writer_event) {
644 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
645 __FILE__, __LINE__);
646 fprintf(writer_component->err, "[error] Failed to copy event %s\n",
647 bt_ctf_event_class_get_name(writer_event_class));
648 goto error;
649 }
650
651 int_ret = bt_ctf_stream_append_event(writer_stream, writer_event);
652 if (int_ret < 0) {
653 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
654 __FILE__, __LINE__);
655 fprintf(writer_component->err, "[error] Failed to append event %s\n",
656 bt_ctf_event_class_get_name(writer_event_class));
657 goto error;
658 }
659
660 ret = BT_COMPONENT_STATUS_OK;
661 goto end;
662
663 error:
664 ret = BT_COMPONENT_STATUS_ERROR;
665 end:
666 bt_put(writer_event);
667 bt_put(writer_event_class);
668 bt_put(writer_stream_class);
669 bt_put(stream_class);
670 bt_put(writer_stream);
671 bt_put(stream);
672 bt_put(event_class);
673 return ret;
674 }
This page took 0.041927 seconds and 4 git commands to generate.