45f875ff641ff7a2b0efaedbb1591b9ad4e59432
[babeltrace.git] / formats / ctf / ir / stream.c
1 /*
2 * stream.c
3 *
4 * Babeltrace CTF IR - Stream
5 *
6 * Copyright 2013, 2014 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/ctf-ir/clock.h>
30 #include <babeltrace/ctf-ir/clock-internal.h>
31 #include <babeltrace/ctf-writer/event.h>
32 #include <babeltrace/ctf-ir/event-internal.h>
33 #include <babeltrace/ctf-ir/event-types-internal.h>
34 #include <babeltrace/ctf-ir/event-fields-internal.h>
35 #include <babeltrace/ctf-ir/stream.h>
36 #include <babeltrace/ctf-ir/stream-internal.h>
37 #include <babeltrace/ctf-ir/stream-class-internal.h>
38 #include <babeltrace/ctf-writer/functor-internal.h>
39 #include <babeltrace/compiler.h>
40 #include <babeltrace/align.h>
41
42 static
43 void bt_ctf_stream_destroy(struct bt_ctf_ref *ref);
44 static
45 int set_structure_field_integer(struct bt_ctf_field *, char *, uint64_t);
46
47 BT_HIDDEN
48 struct bt_ctf_stream *bt_ctf_stream_create(
49 struct bt_ctf_stream_class *stream_class)
50 {
51 int ret;
52 struct bt_ctf_stream *stream = NULL;
53
54 if (!stream_class) {
55 goto end;
56 }
57
58 stream = g_new0(struct bt_ctf_stream, 1);
59 if (!stream) {
60 goto end;
61 }
62
63 bt_ctf_ref_init(&stream->ref_count);
64 stream->packet_context = bt_ctf_field_create(
65 stream_class->packet_context_type);
66 if (!stream->packet_context) {
67 goto error_destroy;
68 }
69
70 /*
71 * A stream class may not have a stream event context defined
72 * in which case this stream will never have a stream_event_context
73 * member since, after a stream's creation, the parent stream class
74 * is "frozen" (immutable).
75 */
76 if (stream_class->event_context_type) {
77 stream->event_context = bt_ctf_field_create(
78 stream_class->event_context_type);
79 if (!stream->packet_context) {
80 goto error_destroy;
81 }
82 }
83
84 /* Initialize events_discarded */
85 ret = set_structure_field_integer(stream->packet_context,
86 "events_discarded", 0);
87 if (ret) {
88 goto error_destroy;
89 }
90
91 stream->pos.fd = -1;
92 stream->id = stream_class->next_stream_id++;
93 stream->stream_class = stream_class;
94 bt_ctf_stream_class_get(stream_class);
95 bt_ctf_stream_class_freeze(stream_class);
96 stream->events = g_ptr_array_new_with_free_func(
97 (GDestroyNotify) bt_ctf_event_put);
98 if (!stream->events) {
99 goto error_destroy;
100 }
101 if (stream_class->event_context_type) {
102 stream->event_contexts = g_ptr_array_new_with_free_func(
103 (GDestroyNotify) bt_ctf_field_put);
104 if (!stream->event_contexts) {
105 goto error_destroy;
106 }
107 }
108 end:
109 return stream;
110 error_destroy:
111 bt_ctf_stream_destroy(&stream->ref_count);
112 return NULL;
113 }
114
115 BT_HIDDEN
116 int bt_ctf_stream_set_flush_callback(struct bt_ctf_stream *stream,
117 flush_func callback, void *data)
118 {
119 int ret = stream ? 0 : -1;
120
121 if (!stream) {
122 goto end;
123 }
124
125 stream->flush.func = callback;
126 stream->flush.data = data;
127 end:
128 return ret;
129 }
130
131 BT_HIDDEN
132 int bt_ctf_stream_set_fd(struct bt_ctf_stream *stream, int fd)
133 {
134 int ret = 0;
135
136 if (stream->pos.fd != -1) {
137 ret = -1;
138 goto end;
139 }
140
141 ctf_init_pos(&stream->pos, NULL, fd, O_RDWR);
142 stream->pos.fd = fd;
143 end:
144 return ret;
145 }
146
147 int bt_ctf_stream_get_discarded_events_count(
148 struct bt_ctf_stream *stream, uint64_t *count)
149 {
150 int64_t ret = 0;
151 int field_signed;
152 struct bt_ctf_field *events_discarded_field = NULL;
153 struct bt_ctf_field_type *events_discarded_field_type = NULL;
154
155 if (!stream || !count || !stream->packet_context) {
156 ret = -1;
157 goto end;
158 }
159
160 events_discarded_field = bt_ctf_field_structure_get_field(
161 stream->packet_context, "events_discarded");
162 if (!events_discarded_field) {
163 ret = -1;
164 goto end;
165 }
166
167 events_discarded_field_type = bt_ctf_field_get_type(
168 events_discarded_field);
169 if (!events_discarded_field_type) {
170 ret = -1;
171 goto end;
172 }
173
174 field_signed = bt_ctf_field_type_integer_get_signed(
175 events_discarded_field_type);
176 if (field_signed < 0) {
177 ret = field_signed;
178 goto end;
179 }
180
181 if (field_signed) {
182 int64_t signed_count;
183
184 ret = bt_ctf_field_signed_integer_get_value(
185 events_discarded_field, &signed_count);
186 if (ret) {
187 goto end;
188 }
189 if (signed_count < 0) {
190 /* Invalid value */
191 ret = -1;
192 goto end;
193 }
194 *count = (uint64_t) signed_count;
195 } else {
196 ret = bt_ctf_field_unsigned_integer_get_value(
197 events_discarded_field, count);
198 if (ret) {
199 goto end;
200 }
201 }
202 end:
203 if (events_discarded_field) {
204 bt_ctf_field_put(events_discarded_field);
205 }
206 if (events_discarded_field_type) {
207 bt_ctf_field_type_put(events_discarded_field_type);
208 }
209 return ret;
210 }
211
212 void bt_ctf_stream_append_discarded_events(struct bt_ctf_stream *stream,
213 uint64_t event_count)
214 {
215 int ret;
216 int field_signed;
217 uint64_t previous_count;
218 uint64_t new_count;
219 struct bt_ctf_field *events_discarded_field = NULL;
220 struct bt_ctf_field_type *events_discarded_field_type = NULL;
221
222 if (!stream || !stream->packet_context) {
223 goto end;
224 }
225
226 ret = bt_ctf_stream_get_discarded_events_count(stream,
227 &previous_count);
228 if (ret) {
229 goto end;
230 }
231
232 events_discarded_field = bt_ctf_field_structure_get_field(
233 stream->packet_context, "events_discarded");
234 if (!events_discarded_field) {
235 goto end;
236 }
237
238 events_discarded_field_type = bt_ctf_field_get_type(
239 events_discarded_field);
240 if (!events_discarded_field_type) {
241 goto end;
242 }
243
244 field_signed = bt_ctf_field_type_integer_get_signed(
245 events_discarded_field_type);
246 if (field_signed < 0) {
247 goto end;
248 }
249
250 new_count = previous_count + event_count;
251 if (field_signed) {
252 ret = bt_ctf_field_signed_integer_set_value(
253 events_discarded_field, (int64_t) new_count);
254 if (ret) {
255 goto end;
256 }
257 } else {
258 ret = bt_ctf_field_unsigned_integer_set_value(
259 events_discarded_field, new_count);
260 if (ret) {
261 goto end;
262 }
263 }
264
265 end:
266 if (events_discarded_field) {
267 bt_ctf_field_put(events_discarded_field);
268 }
269 if (events_discarded_field_type) {
270 bt_ctf_field_type_put(events_discarded_field_type);
271 }
272 }
273
274 int bt_ctf_stream_append_event(struct bt_ctf_stream *stream,
275 struct bt_ctf_event *event)
276 {
277 int ret = 0;
278 uint64_t timestamp;
279 struct bt_ctf_field *event_context_copy = NULL;
280
281 if (!stream || !event) {
282 ret = -1;
283 goto end;
284 }
285
286 /* Make sure the event's payload is set */
287 ret = bt_ctf_event_validate(event);
288 if (ret) {
289 goto end;
290 }
291
292 /* Sample the current stream event context by copying it */
293 if (stream->event_context) {
294 /* Make sure the event context's payload is set */
295 ret = bt_ctf_field_validate(stream->event_context);
296 if (ret) {
297 goto end;
298 }
299
300 event_context_copy = bt_ctf_field_copy(stream->event_context);
301 if (!event_context_copy) {
302 ret = -1;
303 goto end;
304 }
305 }
306
307 timestamp = bt_ctf_clock_get_time(stream->stream_class->clock);
308 ret = bt_ctf_event_set_timestamp(event, timestamp);
309 if (ret) {
310 goto end;
311 }
312
313 bt_ctf_event_get(event);
314 /* Save the new event along with its associated stream event context */
315 g_ptr_array_add(stream->events, event);
316 if (event_context_copy) {
317 g_ptr_array_add(stream->event_contexts, event_context_copy);
318 }
319 end:
320 return ret;
321 }
322
323 struct bt_ctf_field *bt_ctf_stream_get_packet_context(
324 struct bt_ctf_stream *stream)
325 {
326 struct bt_ctf_field *packet_context = NULL;
327
328 if (!stream) {
329 goto end;
330 }
331
332 packet_context = stream->packet_context;
333 if (packet_context) {
334 bt_ctf_field_get(packet_context);
335 }
336 end:
337 return packet_context;
338 }
339
340 int bt_ctf_stream_set_packet_context(struct bt_ctf_stream *stream,
341 struct bt_ctf_field *field)
342 {
343 int ret = 0;
344 struct bt_ctf_field_type *field_type;
345
346 if (!stream || !field) {
347 ret = -1;
348 goto end;
349 }
350
351 field_type = bt_ctf_field_get_type(field);
352 if (field_type != stream->stream_class->packet_context_type) {
353 ret = -1;
354 goto end;
355 }
356
357 bt_ctf_field_type_put(field_type);
358 bt_ctf_field_get(field);
359 bt_ctf_field_put(stream->packet_context);
360 stream->packet_context = field;
361 end:
362 return ret;
363 }
364
365 struct bt_ctf_field *bt_ctf_stream_get_event_context(
366 struct bt_ctf_stream *stream)
367 {
368 struct bt_ctf_field *event_context = NULL;
369
370 if (!stream) {
371 goto end;
372 }
373
374 event_context = stream->event_context;
375 if (event_context) {
376 bt_ctf_field_get(event_context);
377 }
378 end:
379 return event_context;
380 }
381
382 int bt_ctf_stream_set_event_context(struct bt_ctf_stream *stream,
383 struct bt_ctf_field *field)
384 {
385 int ret = 0;
386 struct bt_ctf_field_type *field_type = NULL;
387
388 if (!stream || !field) {
389 ret = -1;
390 goto end;
391 }
392
393 field_type = bt_ctf_field_get_type(field);
394 if (field_type != stream->stream_class->event_context_type) {
395 ret = -1;
396 goto end;
397 }
398
399 bt_ctf_field_get(field);
400 bt_ctf_field_put(stream->event_context);
401 stream->event_context = field;
402 end:
403 if (field_type) {
404 bt_ctf_field_type_put(field_type);
405 }
406 return ret;
407 }
408
409 int bt_ctf_stream_flush(struct bt_ctf_stream *stream)
410 {
411 int ret = 0;
412 size_t i;
413 uint64_t timestamp_begin, timestamp_end, events_discarded;
414 struct bt_ctf_stream_class *stream_class;
415 struct bt_ctf_field *integer = NULL;
416 struct ctf_stream_pos packet_context_pos;
417
418 if (!stream || stream->pos.fd < 0) {
419 /*
420 * Stream does not have an associated fd. It is,
421 * therefore, not a stream being used to write events.
422 */
423 ret = -1;
424 goto end;
425 }
426
427 if (!stream->events->len) {
428 goto end;
429 }
430
431 if (stream->flush.func) {
432 stream->flush.func(stream, stream->flush.data);
433 }
434
435 stream_class = stream->stream_class;
436 timestamp_begin = ((struct bt_ctf_event *) g_ptr_array_index(
437 stream->events, 0))->timestamp;
438 timestamp_end = ((struct bt_ctf_event *) g_ptr_array_index(
439 stream->events, stream->events->len - 1))->timestamp;
440
441 /* Set the default context attributes if present and unset. */
442 ret = set_structure_field_integer(stream->packet_context,
443 "timestamp_begin", timestamp_begin);
444 if (ret) {
445 goto end;
446 }
447
448 ret = set_structure_field_integer(stream->packet_context,
449 "timestamp_end", timestamp_end);
450 if (ret) {
451 goto end;
452 }
453
454 ret = set_structure_field_integer(stream->packet_context,
455 "content_size", UINT64_MAX);
456 if (ret) {
457 goto end;
458 }
459
460 ret = set_structure_field_integer(stream->packet_context,
461 "packet_size", UINT64_MAX);
462 if (ret) {
463 goto end;
464 }
465
466 /* Write packet context */
467 memcpy(&packet_context_pos, &stream->pos,
468 sizeof(struct ctf_stream_pos));
469 ret = bt_ctf_field_serialize(stream->packet_context,
470 &stream->pos);
471 if (ret) {
472 goto end;
473 }
474
475 ret = bt_ctf_stream_get_discarded_events_count(stream,
476 &events_discarded);
477 if (ret) {
478 goto end;
479 }
480
481 /* Unset the packet context's fields. */
482 ret = bt_ctf_field_reset(stream->packet_context);
483 if (ret) {
484 goto end;
485 }
486
487 /* Set the previous number of discarded events. */
488 ret = set_structure_field_integer(stream->packet_context,
489 "events_discarded", events_discarded);
490 if (ret) {
491 goto end;
492 }
493
494 for (i = 0; i < stream->events->len; i++) {
495 struct bt_ctf_event *event = g_ptr_array_index(
496 stream->events, i);
497 uint32_t event_id = bt_ctf_event_class_get_id(
498 event->event_class);
499 uint64_t timestamp = bt_ctf_event_get_timestamp(event);
500
501 ret = bt_ctf_field_reset(stream_class->event_header);
502 if (ret) {
503 goto end;
504 }
505
506 ret = set_structure_field_integer(stream_class->event_header,
507 "id", event_id);
508 if (ret) {
509 goto end;
510 }
511 ret = set_structure_field_integer(stream_class->event_header,
512 "timestamp", timestamp);
513 if (ret) {
514 goto end;
515 }
516
517 /* Write event header */
518 ret = bt_ctf_field_serialize(stream_class->event_header,
519 &stream->pos);
520 if (ret) {
521 goto end;
522 }
523
524 /* Write stream event context */
525 if (stream->event_contexts) {
526 ret = bt_ctf_field_serialize(
527 g_ptr_array_index(stream->event_contexts, i),
528 &stream->pos);
529 if (ret) {
530 goto end;
531 }
532 }
533
534 /* Write event content */
535 ret = bt_ctf_event_serialize(event, &stream->pos);
536 if (ret) {
537 goto end;
538 }
539 }
540
541 /*
542 * Update the packet total size and content size and overwrite the
543 * packet context.
544 * Copy base_mma as the packet may have been remapped (e.g. when a
545 * packet is resized).
546 */
547 packet_context_pos.base_mma = stream->pos.base_mma;
548 ret = set_structure_field_integer(stream->packet_context,
549 "content_size", stream->pos.offset);
550 if (ret) {
551 goto end;
552 }
553
554 ret = set_structure_field_integer(stream->packet_context,
555 "packet_size", stream->pos.packet_size);
556 if (ret) {
557 goto end;
558 }
559
560 ret = bt_ctf_field_serialize(stream->packet_context,
561 &packet_context_pos);
562 if (ret) {
563 goto end;
564 }
565
566 g_ptr_array_set_size(stream->events, 0);
567 if (stream->event_contexts) {
568 g_ptr_array_set_size(stream->event_contexts, 0);
569 }
570 stream->flushed_packet_count++;
571 end:
572 bt_ctf_field_put(integer);
573 return ret;
574 }
575
576 void bt_ctf_stream_get(struct bt_ctf_stream *stream)
577 {
578 if (!stream) {
579 return;
580 }
581
582 bt_ctf_ref_get(&stream->ref_count);
583 }
584
585 void bt_ctf_stream_put(struct bt_ctf_stream *stream)
586 {
587 if (!stream) {
588 return;
589 }
590
591 bt_ctf_ref_put(&stream->ref_count, bt_ctf_stream_destroy);
592 }
593
594 static
595 void bt_ctf_stream_destroy(struct bt_ctf_ref *ref)
596 {
597 struct bt_ctf_stream *stream;
598
599 if (!ref) {
600 return;
601 }
602
603 stream = container_of(ref, struct bt_ctf_stream, ref_count);
604 ctf_fini_pos(&stream->pos);
605 if (close(stream->pos.fd)) {
606 perror("close");
607 }
608
609 if (stream->stream_class) {
610 bt_ctf_stream_class_put(stream->stream_class);
611 }
612 if (stream->events) {
613 g_ptr_array_free(stream->events, TRUE);
614 }
615 if (stream->event_contexts) {
616 g_ptr_array_free(stream->event_contexts, TRUE);
617 }
618 if (stream->packet_context) {
619 bt_ctf_field_put(stream->packet_context);
620 }
621 if (stream->event_context) {
622 bt_ctf_field_put(stream->event_context);
623 }
624 g_free(stream);
625 }
626
627 static
628 int set_structure_field_integer(struct bt_ctf_field *structure, char *name,
629 uint64_t value)
630 {
631 int ret = 0;
632 struct bt_ctf_field_type *field_type = NULL;
633 struct bt_ctf_field *integer =
634 bt_ctf_field_structure_get_field(structure, name);
635
636 if (!structure || !name) {
637 ret = -1;
638 goto end;
639 }
640
641 if (!integer) {
642 /* Field not found, not an error. */
643 goto end;
644 }
645
646 /* Make sure the payload has not already been set. */
647 if (!bt_ctf_field_validate(integer)) {
648 /* Payload already set, not an error */
649 goto end;
650 }
651
652 field_type = bt_ctf_field_get_type(integer);
653 /* Something is serioulsly wrong */
654 assert(field_type);
655 if (bt_ctf_field_type_get_type_id(field_type) != CTF_TYPE_INTEGER) {
656 /*
657 * The user most likely meant for us to populate this field
658 * automatically. However, we can only do this if the field
659 * is an integer. Return an error.
660 */
661 ret = -1;
662 goto end;
663 }
664
665 if (bt_ctf_field_type_integer_get_signed(field_type)) {
666 ret = bt_ctf_field_signed_integer_set_value(integer,
667 (int64_t) value);
668 } else {
669 ret = bt_ctf_field_unsigned_integer_set_value(integer, value);
670 }
671 end:
672 if (integer) {
673 bt_ctf_field_put(integer);
674 }
675 if (field_type) {
676 bt_ctf_field_type_put(field_type);
677 }
678 return ret;
679 }
This page took 0.072788 seconds and 3 git commands to generate.