fs-sink: explicitely handle stream_begin notif
[babeltrace.git] / plugins / ctf / fs-sink / write.c
1 /*
2 * writer.c
3 *
4 * Babeltrace CTF Writer Output Plugin Event Handling
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/ctf-ir/event.h>
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/ctf-ir/event-class.h>
32 #include <babeltrace/ctf-ir/stream.h>
33 #include <babeltrace/ctf-ir/stream-class.h>
34 #include <babeltrace/ctf-ir/clock-class.h>
35 #include <babeltrace/ctf-ir/fields.h>
36 #include <babeltrace/ctf-writer/stream-class.h>
37 #include <babeltrace/ctf-writer/stream.h>
38 #include <assert.h>
39
40 #include <ctfcopytrace.h>
41
42 #include "writer.h"
43
44 static
45 void trace_is_static_listener(struct bt_ctf_trace *trace, void *data)
46 {
47 *((int *) data) = 1;
48 }
49
50 static
51 struct bt_ctf_stream_class *insert_new_stream_class(
52 struct writer_component *writer_component,
53 struct bt_ctf_writer *ctf_writer,
54 struct bt_ctf_stream_class *stream_class)
55 {
56 struct bt_ctf_stream_class *writer_stream_class = NULL;
57 struct bt_ctf_trace *trace = NULL, *writer_trace = NULL;
58 enum bt_component_status ret;
59
60 trace = bt_ctf_stream_class_get_trace(stream_class);
61 if (!trace) {
62 fprintf(writer_component->err,
63 "[error] %s in %s:%d\n", __func__, __FILE__,
64 __LINE__);
65 goto error;
66 }
67
68 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
69 if (!writer_trace) {
70 fprintf(writer_component->err,
71 "[error] %s in %s:%d\n", __func__, __FILE__,
72 __LINE__);
73 goto error;
74 }
75
76 ret = ctf_copy_clock_classes(writer_component->err, writer_trace,
77 writer_stream_class, trace);
78 if (ret != BT_COMPONENT_STATUS_OK) {
79 fprintf(writer_component->err,
80 "[error] %s in %s:%d\n", __func__, __FILE__,
81 __LINE__);
82 goto error;
83 }
84
85 writer_stream_class = ctf_copy_stream_class(writer_component->err,
86 stream_class, writer_trace, true);
87 if (!writer_stream_class) {
88 fprintf(writer_component->err, "[error] Failed to copy stream class\n");
89 fprintf(writer_component->err, "[error] %s in %s:%d\n",
90 __func__, __FILE__, __LINE__);
91 goto error;
92 }
93
94 g_hash_table_insert(writer_component->stream_class_map,
95 (gpointer) stream_class, writer_stream_class);
96
97 goto end;
98
99 error:
100 BT_PUT(writer_stream_class);
101 end:
102 bt_put(writer_trace);
103 bt_put(trace);
104 return writer_stream_class;
105 }
106
107 static
108 struct bt_ctf_stream *insert_new_stream(
109 struct writer_component *writer_component,
110 struct bt_ctf_writer *ctf_writer,
111 struct bt_ctf_stream_class *stream_class,
112 struct bt_ctf_stream *stream)
113 {
114 struct bt_ctf_stream *writer_stream = NULL;
115 struct bt_ctf_stream_class *writer_stream_class = NULL;
116
117 writer_stream_class = g_hash_table_lookup(
118 writer_component->stream_class_map,
119 (gpointer) stream_class);
120 if (!writer_stream_class) {
121 writer_stream_class = insert_new_stream_class(
122 writer_component, ctf_writer, stream_class);
123 if (!writer_stream_class) {
124 fprintf(writer_component->err, "[error] %s in %s:%d\n",
125 __func__, __FILE__, __LINE__);
126 goto error;
127 }
128 }
129 bt_get(writer_stream_class);
130
131 writer_stream = bt_ctf_writer_create_stream(ctf_writer,
132 writer_stream_class);
133 if (!writer_stream) {
134 fprintf(writer_component->err, "[error] %s in %s:%d\n",
135 __func__, __FILE__, __LINE__);
136 goto error;
137 }
138
139 g_hash_table_insert(writer_component->stream_map, (gpointer) stream,
140 writer_stream);
141
142 goto end;
143
144 error:
145 BT_PUT(writer_stream);
146 end:
147 bt_put(writer_stream_class);
148 return writer_stream;
149 }
150
151 static
152 struct bt_ctf_stream *lookup_stream(struct writer_component *writer_component,
153 struct bt_ctf_stream *stream)
154 {
155 return (struct bt_ctf_stream *) g_hash_table_lookup(
156 writer_component->stream_map,
157 (gpointer) stream);
158 }
159
160 static
161 struct bt_ctf_event_class *get_event_class(struct writer_component *writer_component,
162 struct bt_ctf_stream_class *writer_stream_class,
163 struct bt_ctf_event_class *event_class)
164 {
165 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class,
166 bt_ctf_event_class_get_id(event_class));
167 }
168
169 struct fs_writer *insert_new_writer(
170 struct writer_component *writer_component,
171 struct bt_ctf_trace *trace)
172 {
173 struct bt_ctf_writer *ctf_writer = NULL;
174 struct bt_ctf_trace *writer_trace = NULL;
175 char trace_name[PATH_MAX];
176 enum bt_component_status ret;
177 struct fs_writer *fs_writer;
178
179 /* FIXME: replace with trace name when it will work. */
180 snprintf(trace_name, PATH_MAX, "%s/%s_%03d",
181 writer_component->base_path->str,
182 writer_component->trace_name_base->str,
183 writer_component->trace_id++);
184 printf_verbose("CTF-Writer creating trace in %s\n", trace_name);
185
186 ctf_writer = bt_ctf_writer_create(trace_name);
187 if (!ctf_writer) {
188 fprintf(writer_component->err, "[error] %s in %s:%d\n",
189 __func__, __FILE__, __LINE__);
190 goto error;
191 }
192
193 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
194 if (!writer_trace) {
195 fprintf(writer_component->err,
196 "[error] %s in %s:%d\n", __func__, __FILE__,
197 __LINE__);
198 goto error;
199 }
200
201 ret = ctf_copy_trace(writer_component->err, trace, writer_trace);
202 if (ret != BT_COMPONENT_STATUS_OK) {
203 fprintf(writer_component->err, "[error] Failed to copy trace\n");
204 fprintf(writer_component->err, "[error] %s in %s:%d\n",
205 __func__, __FILE__, __LINE__);
206 BT_PUT(ctf_writer);
207 goto error;
208 }
209
210 fs_writer = g_new0(struct fs_writer, 1);
211 if (!fs_writer) {
212 fprintf(writer_component->err,
213 "[error] %s in %s:%d\n", __func__, __FILE__,
214 __LINE__);
215 goto error;
216 }
217 fs_writer->writer = ctf_writer;
218 fs_writer->writer_trace = writer_trace;
219 BT_PUT(writer_trace);
220 if (bt_ctf_trace_is_static(trace)) {
221 fs_writer->trace_static = 1;
222 fs_writer->static_listener_id = -1;
223 } else {
224 ret = bt_ctf_trace_add_is_static_listener(trace,
225 trace_is_static_listener, &fs_writer->trace_static);
226 if (ret < 0) {
227 fprintf(writer_component->err,
228 "[error] %s in %s:%d\n", __func__, __FILE__,
229 __LINE__);
230 g_free(fs_writer);
231 fs_writer = NULL;
232 goto error;
233 }
234 fs_writer->static_listener_id = ret;
235 }
236
237 g_hash_table_insert(writer_component->trace_map, (gpointer) trace,
238 fs_writer);
239
240 goto end;
241
242 error:
243 bt_put(writer_trace);
244 BT_PUT(ctf_writer);
245 end:
246 return fs_writer;
247 }
248
249 static
250 struct fs_writer *get_fs_writer(struct writer_component *writer_component,
251 struct bt_ctf_stream_class *stream_class)
252 {
253 struct bt_ctf_trace *trace = NULL;
254 struct fs_writer *fs_writer;
255
256 trace = bt_ctf_stream_class_get_trace(stream_class);
257 if (!trace) {
258 fprintf(writer_component->err, "[error] %s in %s:%d\n",
259 __func__, __FILE__, __LINE__);
260 goto error;
261 }
262
263 fs_writer = g_hash_table_lookup(writer_component->trace_map,
264 (gpointer) trace);
265 if (!fs_writer) {
266 fs_writer = insert_new_writer(writer_component, trace);
267 }
268 BT_PUT(trace);
269 goto end;
270
271 error:
272 fs_writer = NULL;
273 end:
274 return fs_writer;
275 }
276
277 static
278 struct bt_ctf_stream *get_writer_stream(
279 struct writer_component *writer_component,
280 struct bt_ctf_packet *packet, struct bt_ctf_stream *stream)
281 {
282 struct bt_ctf_stream *writer_stream = NULL;
283
284 writer_stream = lookup_stream(writer_component, stream);
285 if (!writer_stream) {
286 fprintf(writer_component->err, "[error] %s in %s:%d\n",
287 __func__, __FILE__, __LINE__);
288 goto error;
289 }
290 bt_get(writer_stream);
291
292 goto end;
293
294 error:
295 BT_PUT(writer_stream);
296 end:
297 return writer_stream;
298 }
299
300 BT_HIDDEN
301 enum bt_component_status writer_close(
302 struct writer_component *writer_component,
303 struct fs_writer *fs_writer,
304 struct bt_ctf_trace *trace)
305 {
306 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
307
308 if (fs_writer->static_listener_id > 0) {
309 bt_ctf_trace_remove_is_static_listener(trace,
310 fs_writer->static_listener_id);
311 }
312 g_hash_table_remove(writer_component->trace_map, trace);
313 return ret;
314 }
315
316 BT_HIDDEN
317 enum bt_component_status writer_stream_begin(
318 struct writer_component *writer_component,
319 struct bt_ctf_stream *stream)
320 {
321 struct bt_ctf_stream_class *stream_class = NULL;
322 struct fs_writer *fs_writer;
323 struct bt_ctf_writer *ctf_writer = NULL;
324 struct bt_ctf_stream *writer_stream = NULL;
325 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
326
327 stream_class = bt_ctf_stream_get_class(stream);
328 if (!stream_class) {
329 fprintf(writer_component->err, "[error] %s in %s:%d\n",
330 __func__, __FILE__, __LINE__);
331 goto error;
332 }
333
334 fs_writer = get_fs_writer(writer_component, stream_class);
335 if (!fs_writer) {
336 fprintf(writer_component->err, "[error] %s in %s:%d\n",
337 __func__, __FILE__, __LINE__);
338 goto error;
339 }
340 ctf_writer = bt_get(fs_writer->writer);
341 writer_stream = insert_new_stream(writer_component, ctf_writer,
342 stream_class, stream);
343 if (!writer_stream) {
344 fprintf(writer_component->err, "[error] %s in %s:%d\n",
345 __func__, __FILE__, __LINE__);
346 goto error;
347 }
348 fs_writer->active_streams++;
349
350 goto end;
351
352 error:
353 ret = BT_COMPONENT_STATUS_ERROR;
354 end:
355 bt_put(ctf_writer);
356 bt_put(stream_class);
357 return ret;
358 }
359
360 BT_HIDDEN
361 enum bt_component_status writer_stream_end(
362 struct writer_component *writer_component,
363 struct bt_ctf_stream *stream)
364 {
365 struct bt_ctf_stream_class *stream_class = NULL;
366 struct fs_writer *fs_writer;
367 struct bt_ctf_trace *trace = NULL;
368 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
369
370 g_hash_table_remove(writer_component->stream_map, stream);
371
372 stream_class = bt_ctf_stream_get_class(stream);
373 if (!stream_class) {
374 fprintf(writer_component->err, "[error] %s in %s:%d\n",
375 __func__, __FILE__, __LINE__);
376 goto error;
377 }
378
379 fs_writer = get_fs_writer(writer_component, stream_class);
380 if (!fs_writer) {
381 fprintf(writer_component->err, "[error] %s in %s:%d\n",
382 __func__, __FILE__, __LINE__);
383 goto error;
384 }
385
386 assert(fs_writer->active_streams > 0);
387 fs_writer->active_streams--;
388 if (fs_writer->active_streams == 0 && fs_writer->trace_static) {
389 trace = bt_ctf_stream_class_get_trace(stream_class);
390 if (!trace) {
391 fprintf(writer_component->err, "[error] %s in %s:%d\n",
392 __func__, __FILE__, __LINE__);
393 goto error;
394 }
395 ret = writer_close(writer_component, fs_writer, trace);
396 }
397
398 goto end;
399
400 error:
401 ret = BT_COMPONENT_STATUS_ERROR;
402 end:
403 BT_PUT(trace);
404 BT_PUT(stream_class);
405 return ret;
406 }
407
408 BT_HIDDEN
409 enum bt_component_status writer_new_packet(
410 struct writer_component *writer_component,
411 struct bt_ctf_packet *packet)
412 {
413 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
414 struct bt_ctf_field *writer_packet_context = NULL;
415 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
416 int int_ret;
417
418 stream = bt_ctf_packet_get_stream(packet);
419 if (!stream) {
420 fprintf(writer_component->err, "[error] %s in %s:%d\n",
421 __func__, __FILE__, __LINE__);
422 goto error;
423 }
424
425 writer_stream = get_writer_stream(writer_component, packet, stream);
426 if (!writer_stream) {
427 fprintf(writer_component->err, "[error] %s in %s:%d\n",
428 __func__, __FILE__, __LINE__);
429 goto error;
430 }
431 BT_PUT(stream);
432
433 writer_packet_context = ctf_copy_packet_context(writer_component->err,
434 packet, writer_stream, 1);
435 if (!writer_packet_context) {
436 fprintf(writer_component->err, "[error] %s in %s:%d\n",
437 __func__, __FILE__, __LINE__);
438 goto error;
439 }
440
441 int_ret = bt_ctf_stream_set_packet_context(writer_stream,
442 writer_packet_context);
443 if (int_ret < 0) {
444 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
445 __FILE__, __LINE__);
446 goto error;
447 }
448 BT_PUT(writer_stream);
449 BT_PUT(writer_packet_context);
450
451 goto end;
452
453 error:
454 ret = BT_COMPONENT_STATUS_ERROR;
455 end:
456 bt_put(writer_stream);
457 bt_put(writer_packet_context);
458 bt_put(stream);
459 return ret;
460 }
461
462 BT_HIDDEN
463 enum bt_component_status writer_close_packet(
464 struct writer_component *writer_component,
465 struct bt_ctf_packet *packet)
466 {
467 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
468 enum bt_component_status ret;
469
470 stream = bt_ctf_packet_get_stream(packet);
471 if (!stream) {
472 fprintf(writer_component->err, "[error] %s in %s:%d\n",
473 __func__, __FILE__, __LINE__);
474 goto error;
475 }
476
477 writer_stream = lookup_stream(writer_component, stream);
478 if (!writer_stream) {
479 fprintf(writer_component->err, "[error] %s in %s:%d\n",
480 __func__, __FILE__, __LINE__);
481 goto error;
482 }
483 BT_PUT(stream);
484
485 bt_get(writer_stream);
486
487 ret = bt_ctf_stream_flush(writer_stream);
488 if (ret < 0) {
489 fprintf(writer_component->err,
490 "[error] Failed to flush packet\n");
491 goto error;
492 }
493 BT_PUT(writer_stream);
494
495 ret = BT_COMPONENT_STATUS_OK;
496 goto end;
497
498 error:
499 ret = BT_COMPONENT_STATUS_ERROR;
500 end:
501 bt_put(writer_stream);
502 bt_put(stream);
503 return ret;
504 }
505
506 BT_HIDDEN
507 enum bt_component_status writer_output_event(
508 struct writer_component *writer_component,
509 struct bt_ctf_event *event)
510 {
511 enum bt_component_status ret;
512 struct bt_ctf_event_class *event_class = NULL, *writer_event_class = NULL;
513 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
514 struct bt_ctf_stream_class *stream_class = NULL, *writer_stream_class = NULL;
515 struct bt_ctf_event *writer_event = NULL;
516 const char *event_name;
517 int int_ret;
518
519 event_class = bt_ctf_event_get_class(event);
520 if (!event_class) {
521 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
522 __FILE__, __LINE__);
523 goto error;
524 }
525
526 event_name = bt_ctf_event_class_get_name(event_class);
527 if (!event_name) {
528 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
529 __FILE__, __LINE__);
530 goto error;
531 }
532
533 stream = bt_ctf_event_get_stream(event);
534 if (!stream) {
535 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
536 __FILE__, __LINE__);
537 goto error;
538 }
539
540 writer_stream = lookup_stream(writer_component, stream);
541 if (!writer_stream || !bt_get(writer_stream)) {
542 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
543 __FILE__, __LINE__);
544 goto error;
545 }
546
547 stream_class = bt_ctf_event_class_get_stream_class(event_class);
548 if (!stream_class) {
549 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
550 __FILE__, __LINE__);
551 goto error;
552 }
553
554 writer_stream_class = g_hash_table_lookup(
555 writer_component->stream_class_map,
556 (gpointer) stream_class);
557 if (!writer_stream_class || !bt_get(writer_stream_class)) {
558 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
559 __FILE__, __LINE__);
560 goto error;
561 }
562
563 writer_event_class = get_event_class(writer_component,
564 writer_stream_class, event_class);
565 if (!writer_event_class) {
566 writer_event_class = ctf_copy_event_class(writer_component->err,
567 event_class);
568 if (!writer_event_class) {
569 fprintf(writer_component->err, "[error] %s in %s:%d\n",
570 __func__, __FILE__, __LINE__);
571 goto error;
572 }
573 int_ret = bt_ctf_stream_class_add_event_class(
574 writer_stream_class, writer_event_class);
575 if (int_ret) {
576 fprintf(writer_component->err, "[error] %s in %s:%d\n",
577 __func__, __FILE__, __LINE__);
578 goto error;
579 }
580 }
581
582 writer_event = ctf_copy_event(writer_component->err, event,
583 writer_event_class, true);
584 if (!writer_event) {
585 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
586 __FILE__, __LINE__);
587 fprintf(writer_component->err, "[error] Failed to copy event %s\n",
588 bt_ctf_event_class_get_name(writer_event_class));
589 goto error;
590 }
591
592 int_ret = bt_ctf_stream_append_event(writer_stream, writer_event);
593 if (int_ret < 0) {
594 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
595 __FILE__, __LINE__);
596 fprintf(writer_component->err, "[error] Failed to append event %s\n",
597 bt_ctf_event_class_get_name(writer_event_class));
598 goto error;
599 }
600
601 ret = BT_COMPONENT_STATUS_OK;
602 goto end;
603
604 error:
605 ret = BT_COMPONENT_STATUS_ERROR;
606 end:
607 bt_put(writer_event);
608 bt_put(writer_event_class);
609 bt_put(writer_stream_class);
610 bt_put(stream_class);
611 bt_put(writer_stream);
612 bt_put(stream);
613 bt_put(event_class);
614 return ret;
615 }
This page took 0.044969 seconds and 4 git commands to generate.