2285e92a2e4dc27e7eb39cead4bed82d49281265
[babeltrace.git] / plugins / ctf / fs-sink / write.c
1 /*
2 * writer.c
3 *
4 * Babeltrace CTF Writer Output Plugin Event Handling
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/ctf-ir/event.h>
30 #include <babeltrace/ctf-ir/packet.h>
31 #include <babeltrace/ctf-ir/event-class.h>
32 #include <babeltrace/ctf-ir/stream.h>
33 #include <babeltrace/ctf-ir/stream-class.h>
34 #include <babeltrace/ctf-ir/clock-class.h>
35 #include <babeltrace/ctf-ir/fields.h>
36 #include <babeltrace/ctf-writer/stream-class.h>
37 #include <babeltrace/ctf-writer/stream.h>
38 #include <assert.h>
39
40 #include <ctfcopytrace.h>
41
42 #include "writer.h"
43
44 static
45 void trace_is_static_listener(struct bt_ctf_trace *trace, void *data)
46 {
47 *((int *) data) = 1;
48 }
49
50 static
51 struct bt_ctf_stream_class *insert_new_stream_class(
52 struct writer_component *writer_component,
53 struct bt_ctf_writer *ctf_writer,
54 struct bt_ctf_stream_class *stream_class)
55 {
56 struct bt_ctf_stream_class *writer_stream_class = NULL;
57 struct bt_ctf_trace *trace = NULL, *writer_trace = NULL;
58 enum bt_component_status ret;
59
60 trace = bt_ctf_stream_class_get_trace(stream_class);
61 if (!trace) {
62 fprintf(writer_component->err,
63 "[error] %s in %s:%d\n", __func__, __FILE__,
64 __LINE__);
65 goto error;
66 }
67
68 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
69 if (!writer_trace) {
70 fprintf(writer_component->err,
71 "[error] %s in %s:%d\n", __func__, __FILE__,
72 __LINE__);
73 goto error;
74 }
75
76 ret = ctf_copy_clock_classes(writer_component->err, writer_trace,
77 writer_stream_class, trace);
78 if (ret != BT_COMPONENT_STATUS_OK) {
79 fprintf(writer_component->err,
80 "[error] %s in %s:%d\n", __func__, __FILE__,
81 __LINE__);
82 goto error;
83 }
84
85 writer_stream_class = ctf_copy_stream_class(writer_component->err,
86 stream_class, writer_trace, true);
87 if (!writer_stream_class) {
88 fprintf(writer_component->err, "[error] Failed to copy stream class\n");
89 fprintf(writer_component->err, "[error] %s in %s:%d\n",
90 __func__, __FILE__, __LINE__);
91 goto error;
92 }
93
94 g_hash_table_insert(writer_component->stream_class_map,
95 (gpointer) stream_class, writer_stream_class);
96
97 goto end;
98
99 error:
100 BT_PUT(writer_stream_class);
101 end:
102 bt_put(writer_trace);
103 bt_put(trace);
104 return writer_stream_class;
105 }
106
107 static
108 struct bt_ctf_stream *insert_new_stream(
109 struct writer_component *writer_component,
110 struct bt_ctf_writer *ctf_writer,
111 struct bt_ctf_stream_class *stream_class,
112 struct bt_ctf_stream *stream)
113 {
114 struct bt_ctf_stream *writer_stream = NULL;
115 struct bt_ctf_stream_class *writer_stream_class = NULL;
116
117 writer_stream_class = g_hash_table_lookup(
118 writer_component->stream_class_map,
119 (gpointer) stream_class);
120 if (!writer_stream_class) {
121 writer_stream_class = insert_new_stream_class(
122 writer_component, ctf_writer, stream_class);
123 if (!writer_stream_class) {
124 fprintf(writer_component->err, "[error] %s in %s:%d\n",
125 __func__, __FILE__, __LINE__);
126 goto error;
127 }
128 }
129 bt_get(writer_stream_class);
130
131 writer_stream = bt_ctf_writer_create_stream(ctf_writer,
132 writer_stream_class);
133 if (!writer_stream) {
134 fprintf(writer_component->err, "[error] %s in %s:%d\n",
135 __func__, __FILE__, __LINE__);
136 goto error;
137 }
138
139 g_hash_table_insert(writer_component->stream_map, (gpointer) stream,
140 writer_stream);
141
142 goto end;
143
144 error:
145 BT_PUT(writer_stream);
146 end:
147 bt_put(writer_stream_class);
148 return writer_stream;
149 }
150
151 static
152 struct bt_ctf_stream *lookup_stream(struct writer_component *writer_component,
153 struct bt_ctf_stream *stream)
154 {
155 return (struct bt_ctf_stream *) g_hash_table_lookup(
156 writer_component->stream_map,
157 (gpointer) stream);
158 }
159
160 static
161 struct bt_ctf_event_class *get_event_class(struct writer_component *writer_component,
162 struct bt_ctf_stream_class *writer_stream_class,
163 struct bt_ctf_event_class *event_class)
164 {
165 return bt_ctf_stream_class_get_event_class_by_id(writer_stream_class,
166 bt_ctf_event_class_get_id(event_class));
167 }
168
169 struct fs_writer *insert_new_writer(
170 struct writer_component *writer_component,
171 struct bt_ctf_trace *trace)
172 {
173 struct bt_ctf_writer *ctf_writer = NULL;
174 struct bt_ctf_trace *writer_trace = NULL;
175 char trace_name[PATH_MAX];
176 enum bt_component_status ret;
177 struct fs_writer *fs_writer;
178
179 /* FIXME: replace with trace name when it will work. */
180 snprintf(trace_name, PATH_MAX, "%s/%s_%03d",
181 writer_component->base_path->str,
182 writer_component->trace_name_base->str,
183 writer_component->trace_id++);
184 printf_verbose("CTF-Writer creating trace in %s\n", trace_name);
185
186 ctf_writer = bt_ctf_writer_create(trace_name);
187 if (!ctf_writer) {
188 fprintf(writer_component->err, "[error] %s in %s:%d\n",
189 __func__, __FILE__, __LINE__);
190 goto error;
191 }
192
193 writer_trace = bt_ctf_writer_get_trace(ctf_writer);
194 if (!writer_trace) {
195 fprintf(writer_component->err,
196 "[error] %s in %s:%d\n", __func__, __FILE__,
197 __LINE__);
198 goto error;
199 }
200
201 ret = ctf_copy_trace(writer_component->err, trace, writer_trace);
202 if (ret != BT_COMPONENT_STATUS_OK) {
203 fprintf(writer_component->err, "[error] Failed to copy trace\n");
204 fprintf(writer_component->err, "[error] %s in %s:%d\n",
205 __func__, __FILE__, __LINE__);
206 BT_PUT(ctf_writer);
207 goto error;
208 }
209
210 fs_writer = g_new0(struct fs_writer, 1);
211 if (!fs_writer) {
212 fprintf(writer_component->err,
213 "[error] %s in %s:%d\n", __func__, __FILE__,
214 __LINE__);
215 goto error;
216 }
217 fs_writer->writer = ctf_writer;
218 fs_writer->writer_trace = writer_trace;
219 BT_PUT(writer_trace);
220 if (bt_ctf_trace_is_static(trace)) {
221 fs_writer->trace_static = 1;
222 fs_writer->static_listener_id = -1;
223 } else {
224 ret = bt_ctf_trace_add_is_static_listener(trace,
225 trace_is_static_listener, &fs_writer->trace_static);
226 if (ret < 0) {
227 fprintf(writer_component->err,
228 "[error] %s in %s:%d\n", __func__, __FILE__,
229 __LINE__);
230 g_free(fs_writer);
231 fs_writer = NULL;
232 goto error;
233 }
234 fs_writer->static_listener_id = ret;
235 }
236
237 g_hash_table_insert(writer_component->trace_map, (gpointer) trace,
238 fs_writer);
239
240 goto end;
241
242 error:
243 bt_put(writer_trace);
244 BT_PUT(ctf_writer);
245 end:
246 return fs_writer;
247 }
248
249 static
250 struct fs_writer *get_fs_writer(struct writer_component *writer_component,
251 struct bt_ctf_stream_class *stream_class)
252 {
253 struct bt_ctf_trace *trace = NULL;
254 struct fs_writer *fs_writer;
255
256 trace = bt_ctf_stream_class_get_trace(stream_class);
257 if (!trace) {
258 fprintf(writer_component->err, "[error] %s in %s:%d\n",
259 __func__, __FILE__, __LINE__);
260 goto error;
261 }
262
263 fs_writer = g_hash_table_lookup(writer_component->trace_map,
264 (gpointer) trace);
265 if (!fs_writer) {
266 fs_writer = insert_new_writer(writer_component, trace);
267 }
268 BT_PUT(trace);
269 goto end;
270
271 error:
272 fs_writer = NULL;
273 end:
274 return fs_writer;
275 }
276
277 static
278 struct bt_ctf_stream *get_writer_stream(
279 struct writer_component *writer_component,
280 struct bt_ctf_packet *packet, struct bt_ctf_stream *stream)
281 {
282 struct bt_ctf_stream_class *stream_class = NULL;
283 struct bt_ctf_writer *ctf_writer = NULL;
284 struct bt_ctf_stream *writer_stream = NULL;
285
286 stream_class = bt_ctf_stream_get_class(stream);
287 if (!stream_class) {
288 fprintf(writer_component->err, "[error] %s in %s:%d\n",
289 __func__, __FILE__, __LINE__);
290 goto error;
291 }
292
293 writer_stream = lookup_stream(writer_component, stream);
294 if (!writer_stream) {
295 struct fs_writer *fs_writer;
296
297 fs_writer = get_fs_writer(writer_component, stream_class);
298 if (!fs_writer) {
299 fprintf(writer_component->err, "[error] %s in %s:%d\n",
300 __func__, __FILE__, __LINE__);
301 goto error;
302 }
303 ctf_writer = bt_get(fs_writer->writer);
304 writer_stream = insert_new_stream(writer_component, ctf_writer,
305 stream_class, stream);
306 if (!writer_stream) {
307 fprintf(writer_component->err, "[error] %s in %s:%d\n",
308 __func__, __FILE__, __LINE__);
309 goto error;
310 }
311 fs_writer->active_streams++;
312 }
313 bt_get(writer_stream);
314
315 goto end;
316
317 error:
318 BT_PUT(writer_stream);
319 end:
320 bt_put(ctf_writer);
321 bt_put(stream_class);
322 return writer_stream;
323 }
324
325 BT_HIDDEN
326 enum bt_component_status writer_close(
327 struct writer_component *writer_component,
328 struct fs_writer *fs_writer,
329 struct bt_ctf_trace *trace)
330 {
331 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
332
333 if (fs_writer->static_listener_id > 0) {
334 bt_ctf_trace_remove_is_static_listener(trace,
335 fs_writer->static_listener_id);
336 }
337 g_hash_table_remove(writer_component->trace_map, trace);
338 return ret;
339 }
340
341 BT_HIDDEN
342 enum bt_component_status writer_stream_end(
343 struct writer_component *writer_component,
344 struct bt_ctf_stream *stream)
345 {
346 struct bt_ctf_stream_class *stream_class = NULL;
347 struct fs_writer *fs_writer;
348 struct bt_ctf_trace *trace = NULL;
349 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
350
351 g_hash_table_remove(writer_component->stream_map, stream);
352
353 stream_class = bt_ctf_stream_get_class(stream);
354 if (!stream_class) {
355 fprintf(writer_component->err, "[error] %s in %s:%d\n",
356 __func__, __FILE__, __LINE__);
357 goto error;
358 }
359
360 fs_writer = get_fs_writer(writer_component, stream_class);
361 if (!fs_writer) {
362 fprintf(writer_component->err, "[error] %s in %s:%d\n",
363 __func__, __FILE__, __LINE__);
364 goto error;
365 }
366
367 assert(fs_writer->active_streams > 0);
368 fs_writer->active_streams--;
369 if (fs_writer->active_streams == 0 && fs_writer->trace_static) {
370 trace = bt_ctf_stream_class_get_trace(stream_class);
371 if (!trace) {
372 fprintf(writer_component->err, "[error] %s in %s:%d\n",
373 __func__, __FILE__, __LINE__);
374 goto error;
375 }
376 ret = writer_close(writer_component, fs_writer, trace);
377 }
378
379 goto end;
380
381 error:
382 ret = BT_COMPONENT_STATUS_ERROR;
383 end:
384 BT_PUT(trace);
385 BT_PUT(stream_class);
386 return ret;
387 }
388
389 BT_HIDDEN
390 enum bt_component_status writer_new_packet(
391 struct writer_component *writer_component,
392 struct bt_ctf_packet *packet)
393 {
394 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
395 struct bt_ctf_field *writer_packet_context = NULL;
396 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
397 int int_ret;
398
399 stream = bt_ctf_packet_get_stream(packet);
400 if (!stream) {
401 fprintf(writer_component->err, "[error] %s in %s:%d\n",
402 __func__, __FILE__, __LINE__);
403 goto error;
404 }
405
406 writer_stream = get_writer_stream(writer_component, packet, stream);
407 if (!writer_stream) {
408 fprintf(writer_component->err, "[error] %s in %s:%d\n",
409 __func__, __FILE__, __LINE__);
410 goto error;
411 }
412 BT_PUT(stream);
413
414 writer_packet_context = ctf_copy_packet_context(writer_component->err,
415 packet, writer_stream, 1);
416 if (!writer_packet_context) {
417 fprintf(writer_component->err, "[error] %s in %s:%d\n",
418 __func__, __FILE__, __LINE__);
419 goto error;
420 }
421
422 int_ret = bt_ctf_stream_set_packet_context(writer_stream,
423 writer_packet_context);
424 if (int_ret < 0) {
425 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
426 __FILE__, __LINE__);
427 goto error;
428 }
429 BT_PUT(writer_stream);
430 BT_PUT(writer_packet_context);
431
432 goto end;
433
434 error:
435 ret = BT_COMPONENT_STATUS_ERROR;
436 end:
437 bt_put(writer_stream);
438 bt_put(writer_packet_context);
439 bt_put(stream);
440 return ret;
441 }
442
443 BT_HIDDEN
444 enum bt_component_status writer_close_packet(
445 struct writer_component *writer_component,
446 struct bt_ctf_packet *packet)
447 {
448 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
449 enum bt_component_status ret;
450
451 stream = bt_ctf_packet_get_stream(packet);
452 if (!stream) {
453 fprintf(writer_component->err, "[error] %s in %s:%d\n",
454 __func__, __FILE__, __LINE__);
455 goto error;
456 }
457
458 writer_stream = lookup_stream(writer_component, stream);
459 if (!writer_stream) {
460 fprintf(writer_component->err, "[error] %s in %s:%d\n",
461 __func__, __FILE__, __LINE__);
462 goto error;
463 }
464 BT_PUT(stream);
465
466 bt_get(writer_stream);
467
468 ret = bt_ctf_stream_flush(writer_stream);
469 if (ret < 0) {
470 fprintf(writer_component->err,
471 "[error] Failed to flush packet\n");
472 goto error;
473 }
474 BT_PUT(writer_stream);
475
476 ret = BT_COMPONENT_STATUS_OK;
477 goto end;
478
479 error:
480 ret = BT_COMPONENT_STATUS_ERROR;
481 end:
482 bt_put(writer_stream);
483 bt_put(stream);
484 return ret;
485 }
486
487 BT_HIDDEN
488 enum bt_component_status writer_output_event(
489 struct writer_component *writer_component,
490 struct bt_ctf_event *event)
491 {
492 enum bt_component_status ret;
493 struct bt_ctf_event_class *event_class = NULL, *writer_event_class = NULL;
494 struct bt_ctf_stream *stream = NULL, *writer_stream = NULL;
495 struct bt_ctf_stream_class *stream_class = NULL, *writer_stream_class = NULL;
496 struct bt_ctf_event *writer_event = NULL;
497 const char *event_name;
498 int int_ret;
499
500 event_class = bt_ctf_event_get_class(event);
501 if (!event_class) {
502 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
503 __FILE__, __LINE__);
504 goto error;
505 }
506
507 event_name = bt_ctf_event_class_get_name(event_class);
508 if (!event_name) {
509 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
510 __FILE__, __LINE__);
511 goto error;
512 }
513
514 stream = bt_ctf_event_get_stream(event);
515 if (!stream) {
516 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
517 __FILE__, __LINE__);
518 goto error;
519 }
520
521 writer_stream = lookup_stream(writer_component, stream);
522 if (!writer_stream || !bt_get(writer_stream)) {
523 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
524 __FILE__, __LINE__);
525 goto error;
526 }
527
528 stream_class = bt_ctf_event_class_get_stream_class(event_class);
529 if (!stream_class) {
530 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
531 __FILE__, __LINE__);
532 goto error;
533 }
534
535 writer_stream_class = g_hash_table_lookup(
536 writer_component->stream_class_map,
537 (gpointer) stream_class);
538 if (!writer_stream_class || !bt_get(writer_stream_class)) {
539 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
540 __FILE__, __LINE__);
541 goto error;
542 }
543
544 writer_event_class = get_event_class(writer_component,
545 writer_stream_class, event_class);
546 if (!writer_event_class) {
547 writer_event_class = ctf_copy_event_class(writer_component->err,
548 event_class);
549 if (!writer_event_class) {
550 fprintf(writer_component->err, "[error] %s in %s:%d\n",
551 __func__, __FILE__, __LINE__);
552 goto error;
553 }
554 int_ret = bt_ctf_stream_class_add_event_class(
555 writer_stream_class, writer_event_class);
556 if (int_ret) {
557 fprintf(writer_component->err, "[error] %s in %s:%d\n",
558 __func__, __FILE__, __LINE__);
559 goto error;
560 }
561 }
562
563 writer_event = ctf_copy_event(writer_component->err, event,
564 writer_event_class, true);
565 if (!writer_event) {
566 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
567 __FILE__, __LINE__);
568 fprintf(writer_component->err, "[error] Failed to copy event %s\n",
569 bt_ctf_event_class_get_name(writer_event_class));
570 goto error;
571 }
572
573 int_ret = bt_ctf_stream_append_event(writer_stream, writer_event);
574 if (int_ret < 0) {
575 fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__,
576 __FILE__, __LINE__);
577 fprintf(writer_component->err, "[error] Failed to append event %s\n",
578 bt_ctf_event_class_get_name(writer_event_class));
579 goto error;
580 }
581
582 ret = BT_COMPONENT_STATUS_OK;
583 goto end;
584
585 error:
586 ret = BT_COMPONENT_STATUS_ERROR;
587 end:
588 bt_put(writer_event);
589 bt_put(writer_event_class);
590 bt_put(writer_stream_class);
591 bt_put(stream_class);
592 bt_put(writer_stream);
593 bt_put(stream);
594 bt_put(event_class);
595 return ret;
596 }
This page took 0.039871 seconds and 3 git commands to generate.