Implement ctf.lttng-live component
[babeltrace.git] / plugins / ctf / common / notif-iter / notif-iter.c
index 56bf9fff49b04e9bbc829d6f2c791840d9bd5f86..bc20f8814b1de423158fa04961a9134d42f4a599 100644 (file)
 #include <babeltrace/ctf-ir/stream-class.h>
 #include <babeltrace/ctf-ir/packet.h>
 #include <babeltrace/ctf-ir/stream.h>
-#include <babeltrace/ctf-ir/clock.h>
+#include <babeltrace/ctf-ir/clock-class.h>
 #include <babeltrace/ctf-ir/event-class.h>
-#include <babeltrace/plugin/notification/packet.h>
-#include <babeltrace/plugin/notification/event.h>
+#include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/clock-class-priority-map.h>
 #include <babeltrace/ref.h>
 #include <glib.h>
 
 #define PRINT_ERR_STREAM       notit->err_stream
 #define PRINT_PREFIX           "ctf-notif-iter"
-#include "print.h"
+#include "../print.h"
 
 #include "notif-iter.h"
 #include "../btr/btr.h"
@@ -103,6 +105,39 @@ enum state {
        STATE_SKIP_PACKET_PADDING,
 };
 
+struct trace_field_path_cache {
+       /*
+        * Indexes of the stream_id and stream_instance_id field in the packet
+        * header structure, -1 if unset.
+        */
+       int stream_id;
+       int stream_instance_id;
+};
+
+struct stream_class_field_path_cache {
+       /*
+        * Indexes of the v and id fields in the stream event header structure,
+        * -1 if unset.
+        */
+       int v;
+       int id;
+
+       /*
+        * index of the timestamp_end, packet_size and content_size fields in
+        * the stream packet context structure. Set to -1 if the fields were
+        * not found.
+        */
+       int timestamp_end;
+       int packet_size;
+       int content_size;
+};
+
+struct field_cb_override {
+       enum bt_ctf_btr_status (* func)(void *value,
+                       struct bt_ctf_field_type *type, void *data);
+       void *data;
+};
+
 /* CTF notification iterator */
 struct bt_ctf_notif_iter {
        /* Visit stack */
@@ -130,6 +165,11 @@ struct bt_ctf_notif_iter {
        /* Current packet (NULL if not created yet) */
        struct bt_ctf_packet *packet;
 
+       /*
+        * Current timestamp_end field (to consider before switching packets).
+        */
+       struct bt_ctf_field *cur_timestamp_end;
+
        /* Database of current dynamic scopes (owned by this) */
        struct {
                struct bt_ctf_field *trace_packet_header;
@@ -140,6 +180,21 @@ struct bt_ctf_notif_iter {
                struct bt_ctf_field *event_payload;
        } dscopes;
 
+       /*
+        * Special field overrides.
+        *
+        * Overrides are used to implement the behaviours of special fields such
+        * as "timestamp_end" (which must be ignored until the end of the
+        * packet), "id" (event id) which can be present multiple times and must
+        * be updated multiple time.
+        *
+        * This should be used to implement the behaviour of integer fields
+        * mapped to clocks and other "tagged" fields (in CTF 2).
+        *
+        * bt_ctf_field_type to struct field_cb_override
+        */
+       GHashTable *field_overrides;
+
        /* Current state */
        enum state state;
 
@@ -173,11 +228,33 @@ struct bt_ctf_notif_iter {
 
        /* Current content size (bits) (-1 if unknown) */
        int64_t cur_content_size;
+
+       /* bt_ctf_clock_class to uint64_t. */
+       GHashTable *clock_states;
+
+       /*
+        * Cache of the trace-constant field paths (event header type)
+        * associated to the current trace.
+        */
+       struct trace_field_path_cache trace_field_path_cache;
+
+       /*
+        * Field path cache associated with the current stream class.
+        * Ownership of this structure belongs to the field_path_caches HT.
+        */
+       struct stream_class_field_path_cache *cur_sc_field_path_cache;
+
+       /* bt_ctf_stream_class to struct stream_class_field_path_cache. */
+       GHashTable *sc_field_path_caches;
 };
 
 static
 int bt_ctf_notif_iter_switch_packet(struct bt_ctf_notif_iter *notit);
 
+static
+enum bt_ctf_btr_status btr_timestamp_end_cb(void *value,
+               struct bt_ctf_field_type *type, void *data);
+
 static
 void stack_entry_free_func(gpointer data)
 {
@@ -284,7 +361,7 @@ static inline
 enum bt_ctf_notif_iter_status notif_iter_status_from_m_status(
                enum bt_ctf_notif_iter_medium_status m_status)
 {
-       return m_status;
+       return (int) m_status;
 }
 
 static inline
@@ -305,38 +382,12 @@ size_t packet_at(struct bt_ctf_notif_iter *notit)
        return notit->buf.packet_offset + notit->buf.at;
 }
 
-static inline
-size_t remaining_content_bits(struct bt_ctf_notif_iter *notit)
-{
-       if (notit->cur_content_size == -1) {
-               return -1;
-       }
-
-       return notit->cur_content_size - packet_at(notit);
-}
-
-static inline
-size_t remaining_packet_bits(struct bt_ctf_notif_iter *notit)
-{
-       if (notit->cur_packet_size == -1) {
-               return -1;
-       }
-
-       return notit->cur_packet_size - packet_at(notit);
-}
-
 static inline
 void buf_consume_bits(struct bt_ctf_notif_iter *notit, size_t incr)
 {
        notit->buf.at += incr;
 }
 
-static inline
-bool buf_has_enough_bits(struct bt_ctf_notif_iter *notit, size_t sz)
-{
-       return buf_available_bits(notit) >= sz;
-}
-
 static
 enum bt_ctf_notif_iter_status request_medium_bytes(
                struct bt_ctf_notif_iter *notit)
@@ -496,8 +547,7 @@ enum bt_ctf_notif_iter_status read_packet_header_begin_state(
        packet_header_type = bt_ctf_trace_get_packet_header_type(
                        notit->meta.trace);
        if (!packet_header_type) {
-               PERR("Failed to retrieve trace's packet header type\n");
-               ret = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_AFTER_TRACE_PACKET_HEADER;
                goto end;
        }
 
@@ -522,24 +572,167 @@ static inline
 bool is_struct_type(struct bt_ctf_field_type *field_type)
 {
        return bt_ctf_field_type_get_type_id(field_type) ==
-                       BT_CTF_TYPE_ID_STRUCT;
+                       BT_CTF_FIELD_TYPE_ID_STRUCT;
 }
 
 static inline
 bool is_variant_type(struct bt_ctf_field_type *field_type)
 {
        return bt_ctf_field_type_get_type_id(field_type) ==
-                       BT_CTF_TYPE_ID_VARIANT;
+                       BT_CTF_FIELD_TYPE_ID_VARIANT;
+}
+
+static
+struct stream_class_field_path_cache *
+create_stream_class_field_path_cache_entry(
+               struct bt_ctf_notif_iter *notit,
+               struct bt_ctf_stream_class *stream_class)
+{
+       int v = -1;
+       int id = -1;
+       int timestamp_end = -1;
+       int packet_size = -1;
+       int content_size = -1;
+       struct stream_class_field_path_cache *cache_entry = g_new0(
+                       struct stream_class_field_path_cache, 1);
+       struct bt_ctf_field_type *event_header = NULL, *packet_context = NULL;
+
+       if (!cache_entry) {
+               goto end;
+       }
+
+       event_header = bt_ctf_stream_class_get_event_header_type(stream_class);
+       if (event_header && bt_ctf_field_type_is_structure(event_header)) {
+               int i, count;
+
+               count = bt_ctf_field_type_structure_get_field_count(
+                               event_header);
+               if (count < 0) {
+                       goto error;
+               }
+               for (i = 0; i < count; i++) {
+                       int ret;
+                       const char *name;
+
+                       ret = bt_ctf_field_type_structure_get_field(
+                                       event_header, &name, NULL, i);
+                       if (ret) {
+                               goto error;
+                       }
+
+                       if (v != -1 && id != -1) {
+                               break;
+                       }
+                       if (v == -1 && !strcmp(name, "v")) {
+                               v = i;
+                       } else if (id == -1 && !strcmp(name, "id")) {
+                               id = i;
+                       }
+               }
+       }
+
+       packet_context = bt_ctf_stream_class_get_packet_context_type(
+                       stream_class);
+       if (packet_context && bt_ctf_field_type_is_structure(packet_context)) {
+               int i, count;
+
+               count = bt_ctf_field_type_structure_get_field_count(
+                               packet_context);
+               if (count < 0) {
+                       goto error;
+               }
+               for (i = 0; i < count; i++) {
+                       int ret;
+                       const char *name;
+                       struct bt_ctf_field_type *field_type;
+
+                       if (timestamp_end != -1 && packet_size != -1 &&
+                                       content_size != -1) {
+                               break;
+                       }
+
+                       ret = bt_ctf_field_type_structure_get_field(
+                                       packet_context, &name, &field_type, i);
+                       if (ret) {
+                               goto error;
+                       }
+
+                       if (timestamp_end == -1 &&
+                                       !strcmp(name, "timestamp_end")) {
+                               struct field_cb_override *override = g_new0(
+                                               struct field_cb_override, 1);
+
+                               if (!override) {
+                                       BT_PUT(field_type);
+                                       goto error;
+                               }
+
+                               override->func = btr_timestamp_end_cb;
+                               override->data = notit;
+
+                               g_hash_table_insert(notit->field_overrides,
+                                               bt_get(field_type), override);
+
+                               timestamp_end = i;
+                       } else if (packet_size == -1 &&
+                                       !strcmp(name, "packet_size")) {
+                               packet_size = i;
+                       } else if (content_size == -1 &&
+                                       !strcmp(name, "content_size")) {
+                               content_size = i;
+                       }
+                       BT_PUT(field_type);
+               }
+       }
+
+       cache_entry->v = v;
+       cache_entry->id = id;
+       cache_entry->timestamp_end = timestamp_end;
+       cache_entry->packet_size = packet_size;
+       cache_entry->content_size = content_size;
+end:
+       BT_PUT(event_header);
+       BT_PUT(packet_context);
+       return cache_entry;
+error:
+       g_free(cache_entry);
+       cache_entry = NULL;
+       goto end;
+}
+
+static
+struct stream_class_field_path_cache *get_stream_class_field_path_cache(
+               struct bt_ctf_notif_iter *notit,
+               struct bt_ctf_stream_class *stream_class)
+{
+       bool cache_entry_found;
+       struct stream_class_field_path_cache *cache_entry;
+
+       cache_entry_found = g_hash_table_lookup_extended(
+                       notit->sc_field_path_caches,
+                       stream_class, NULL, (gpointer) &cache_entry);
+       if (unlikely(!cache_entry_found)) {
+               cache_entry = create_stream_class_field_path_cache_entry(notit,
+                               stream_class);
+               g_hash_table_insert(notit->sc_field_path_caches,
+                               bt_get(stream_class), (gpointer) cache_entry);
+       }
+
+       return cache_entry;
 }
 
 static inline
-enum bt_ctf_notif_iter_status set_current_stream_class(struct bt_ctf_notif_iter *notit)
+enum bt_ctf_notif_iter_status set_current_stream_class(
+               struct bt_ctf_notif_iter *notit)
 {
        enum bt_ctf_notif_iter_status status = BT_CTF_NOTIF_ITER_STATUS_OK;
-       struct bt_ctf_field_type *packet_header_type;
+       struct bt_ctf_field_type *packet_header_type = NULL;
        struct bt_ctf_field_type *stream_id_field_type = NULL;
        uint64_t stream_id;
 
+       /* Clear the current stream class field path cache. */
+       notit->cur_sc_field_path_cache = NULL;
+
        /* Is there any "stream_id" field in the packet header? */
        packet_header_type = bt_ctf_trace_get_packet_header_type(
                        notit->meta.trace);
@@ -557,8 +750,8 @@ enum bt_ctf_notif_iter_status set_current_stream_class(struct bt_ctf_notif_iter
                                packet_header_type, "stream_id");
        if (stream_id_field_type) {
                /* Find appropriate stream class using current stream ID */
-               struct bt_ctf_field *stream_id_field = NULL;
                int ret;
+               struct bt_ctf_field *stream_id_field = NULL;
 
                assert(notit->dscopes.trace_packet_header);
 
@@ -578,7 +771,6 @@ enum bt_ctf_notif_iter_status set_current_stream_class(struct bt_ctf_notif_iter
        }
 
        BT_PUT(notit->meta.stream_class);
-
        notit->meta.stream_class = bt_ctf_trace_get_stream_class_by_id(
                        notit->meta.trace, stream_id);
        if (!notit->meta.stream_class) {
@@ -588,6 +780,17 @@ enum bt_ctf_notif_iter_status set_current_stream_class(struct bt_ctf_notif_iter
                goto end;
        }
 
+       /*
+        * Retrieve (or lazily create) the current stream class field path
+        * cache.
+        */
+       notit->cur_sc_field_path_cache = get_stream_class_field_path_cache(
+                       notit, notit->meta.stream_class);
+       if (!notit->cur_sc_field_path_cache) {
+               PERR("Failed to retrieve stream class field path cache\n");
+               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               goto end;
+       }
 end:
        BT_PUT(packet_header_type);
        BT_PUT(stream_id_field_type);
@@ -620,8 +823,7 @@ enum bt_ctf_notif_iter_status read_packet_context_begin_state(
        packet_context_type = bt_ctf_stream_class_get_packet_context_type(
                        notit->meta.stream_class);
        if (!packet_context_type) {
-               PERR("Failed to retrieve stream class's packet context\n");
-               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_AFTER_STREAM_PACKET_CONTEXT;
                goto end;
        }
 
@@ -732,8 +934,7 @@ enum bt_ctf_notif_iter_status read_event_header_begin_state(
        event_header_type = bt_ctf_stream_class_get_event_header_type(
                notit->meta.stream_class);
        if (!event_header_type) {
-               PERR("Failed to retrieve stream class's event header type\n");
-               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_AFTER_STREAM_EVENT_HEADER;
                goto end;
        }
 
@@ -889,12 +1090,6 @@ enum bt_ctf_notif_iter_status after_event_header_state(
 {
        enum bt_ctf_notif_iter_status status;
 
-       status = set_current_packet_content_sizes(notit);
-       if (status != BT_CTF_NOTIF_ITER_STATUS_OK) {
-               PERR("Failed to set current packet and content sizes\n");
-               goto end;
-       }
-
        status = set_current_event_class(notit);
        if (status != BT_CTF_NOTIF_ITER_STATUS_OK) {
                PERR("Failed to set current event class\n");
@@ -917,8 +1112,7 @@ enum bt_ctf_notif_iter_status read_stream_event_context_begin_state(
        stream_event_context_type = bt_ctf_stream_class_get_event_context_type(
                notit->meta.stream_class);
        if (!stream_event_context_type) {
-               PERR("Failed to retrieve stream class's event context type\n");
-               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_DSCOPE_EVENT_CONTEXT_BEGIN;
                goto end;
        }
 
@@ -951,11 +1145,9 @@ enum bt_ctf_notif_iter_status read_event_context_begin_state(
        event_context_type = bt_ctf_event_class_get_context_type(
                notit->meta.event_class);
        if (!event_context_type) {
-               PERR("Failed to retrieve event class's context type\n");
-               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_DSCOPE_EVENT_PAYLOAD_BEGIN;
                goto end;
        }
-
        status = read_dscope_begin_state(notit, event_context_type,
                STATE_DSCOPE_EVENT_PAYLOAD_BEGIN,
                STATE_DSCOPE_EVENT_CONTEXT_CONTINUE,
@@ -985,8 +1177,7 @@ enum bt_ctf_notif_iter_status read_event_payload_begin_state(
        event_payload_type = bt_ctf_event_class_get_payload_type(
                notit->meta.event_class);
        if (!event_payload_type) {
-               PERR("Failed to retrieve event class's payload type\n");
-               status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+               notit->state = STATE_EMIT_NOTIF_EVENT;
                goto end;
        }
 
@@ -1114,17 +1305,8 @@ enum bt_ctf_notif_iter_status handle_state(struct bt_ctf_notif_iter *notit)
        return status;
 }
 
-
 /**
  * Resets the internal state of a CTF notification iterator.
- *
- * This function can be used when it is desired to seek to the beginning
- * of another packet. It is expected that the next call to
- * bt_ctf_notif_iter_medium_ops::request_bytes() made by this
- * notification iterator will return the \em first bytes of a \em
- * packet.
- *
- * @param notif_iter           CTF notification iterator
  */
 static
 void bt_ctf_notif_iter_reset(struct bt_ctf_notif_iter *notit)
@@ -1154,6 +1336,7 @@ int bt_ctf_notif_iter_switch_packet(struct bt_ctf_notif_iter *notit)
        BT_PUT(notit->meta.stream_class);
        BT_PUT(notit->meta.event_class);
        BT_PUT(notit->packet);
+       BT_PUT(notit->cur_timestamp_end);
        put_all_dscopes(notit);
 
        /*
@@ -1177,6 +1360,7 @@ int bt_ctf_notif_iter_switch_packet(struct bt_ctf_notif_iter *notit)
 
        notit->cur_content_size = -1;
        notit->cur_packet_size = -1;
+       notit->cur_sc_field_path_cache = NULL;
 end:
        return ret;
 }
@@ -1199,17 +1383,17 @@ struct bt_ctf_field *get_next_field(struct bt_ctf_notif_iter *notit)
        }
 
        switch (bt_ctf_field_type_get_type_id(base_type)) {
-       case BT_CTF_TYPE_ID_STRUCT:
+       case BT_CTF_FIELD_TYPE_ID_STRUCT:
                next_field = bt_ctf_field_structure_get_field_by_index(
                        base_field, index);
                break;
-       case BT_CTF_TYPE_ID_ARRAY:
+       case BT_CTF_FIELD_TYPE_ID_ARRAY:
                next_field = bt_ctf_field_array_get_field(base_field, index);
                break;
-       case BT_CTF_TYPE_ID_SEQUENCE:
+       case BT_CTF_FIELD_TYPE_ID_SEQUENCE:
                next_field = bt_ctf_field_sequence_get_field(base_field, index);
                break;
-       case BT_CTF_TYPE_ID_VARIANT:
+       case BT_CTF_FIELD_TYPE_ID_VARIANT:
                next_field = bt_ctf_field_variant_get_current_field(base_field);
                break;
        default:
@@ -1217,10 +1401,6 @@ struct bt_ctf_field *get_next_field(struct bt_ctf_notif_iter *notit)
                break;
        }
 
-       if (!next_field) {
-               next_field = NULL;
-       }
-
 end:
        BT_PUT(base_type);
 
@@ -1228,8 +1408,113 @@ end:
 }
 
 static
-enum bt_ctf_btr_status btr_signed_int_cb(int64_t value,
-               struct bt_ctf_field_type *type, void *data)
+void update_clock_state(uint64_t *state,
+               struct bt_ctf_field *value_field)
+{
+       struct bt_ctf_field_type *value_type = NULL;
+       uint64_t requested_new_value;
+       uint64_t requested_new_value_mask;
+       uint64_t cur_value_masked;
+       int requested_new_value_size;
+       int ret;
+
+       value_type = bt_ctf_field_get_type(value_field);
+       assert(value_type);
+
+       requested_new_value_size =
+                       bt_ctf_field_type_integer_get_size(value_type);
+       assert(requested_new_value_size > 0);
+
+       ret = bt_ctf_field_unsigned_integer_get_value(value_field,
+                       &requested_new_value);
+       assert(!ret);
+
+       /*
+        * Special case for a 64-bit new value, which is the limit
+        * of a clock value as of this version: overwrite the
+        * current value directly.
+        */
+       if (requested_new_value_size == 64) {
+               *state = requested_new_value;
+               goto end;
+       }
+
+       requested_new_value_mask = (1ULL << requested_new_value_size) - 1;
+       cur_value_masked = *state & requested_new_value_mask;
+
+       if (requested_new_value < cur_value_masked) {
+               /*
+                * It looks like a wrap happened on the number of bits
+                * of the requested new value. Assume that the clock
+                * value wrapped only one time.
+                */
+               *state += requested_new_value_mask + 1;
+       }
+
+       /* Clear the low bits of the current clock value. */
+       *state &= ~requested_new_value_mask;
+
+       /* Set the low bits of the current clock value. */
+       *state |= requested_new_value;
+end:
+       bt_put(value_type);
+}
+
+static
+enum bt_ctf_btr_status update_clock(struct bt_ctf_notif_iter *notit,
+               struct bt_ctf_field *int_field)
+{
+       gboolean clock_class_found;
+       uint64_t *clock_state;
+       struct bt_ctf_field_type *int_field_type = NULL;
+       enum bt_ctf_btr_status ret = BT_CTF_BTR_STATUS_OK;
+       struct bt_ctf_clock_class *clock_class = NULL;
+
+       int_field_type = bt_ctf_field_get_type(int_field);
+       if (unlikely(!int_field_type)) {
+               goto end;
+       }
+
+       clock_class = bt_ctf_field_type_integer_get_mapped_clock_class(
+               int_field_type);
+       if (likely(!clock_class)) {
+               goto end;
+       }
+
+       clock_class_found = g_hash_table_lookup_extended(notit->clock_states,
+                       clock_class, NULL, (gpointer) &clock_state);
+       if (unlikely(!clock_class_found)) {
+               const char *clock_class_name =
+                       bt_ctf_clock_class_get_name(clock_class);
+
+               PERR("Unknown clock class %s mapped to integer encountered in stream\n",
+                               clock_class_name ? : "NULL");
+               ret = BT_CTF_BTR_STATUS_ERROR;
+               goto end;
+       }
+
+       if (unlikely(!clock_state)) {
+               clock_state = g_new0(uint64_t, 1);
+               if (!clock_state) {
+                       ret = BT_CTF_BTR_STATUS_ENOMEM;
+                       goto end;
+               }
+               g_hash_table_insert(notit->clock_states, bt_get(clock_class),
+                               clock_state);
+       }
+
+       /* Update the clock's state. */
+       update_clock_state(clock_state, int_field);
+end:
+       bt_put(int_field_type);
+       bt_put(clock_class);
+       return ret;
+}
+
+static
+enum bt_ctf_btr_status btr_unsigned_int_common(uint64_t value,
+               struct bt_ctf_field_type *type, void *data,
+               struct bt_ctf_field **out_int_field)
 {
        enum bt_ctf_btr_status status = BT_CTF_BTR_STATUS_OK;
        struct bt_ctf_field *field = NULL;
@@ -1237,23 +1522,27 @@ enum bt_ctf_btr_status btr_signed_int_cb(int64_t value,
        struct bt_ctf_notif_iter *notit = data;
        int ret;
 
-       /* create next field */
+       /* Create next field */
        field = get_next_field(notit);
        if (!field) {
-               PERR("Failed to get next field (signed int)\n");
+               PERR("Failed to get next field (unsigned int)\n");
                status = BT_CTF_BTR_STATUS_ERROR;
-               goto end;
+               goto end_no_put;
        }
 
        switch(bt_ctf_field_type_get_type_id(type)) {
-       case BT_CTF_TYPE_ID_INTEGER:
+       case BT_CTF_FIELD_TYPE_ID_INTEGER:
                /* Integer field is created field */
                BT_MOVE(int_field, field);
+               bt_get(type);
                break;
-       case BT_CTF_TYPE_ID_ENUM:
+       case BT_CTF_FIELD_TYPE_ID_ENUM:
                int_field = bt_ctf_field_enumeration_get_container(field);
+               type = bt_ctf_field_get_type(int_field);
                break;
        default:
+               assert(0);
+               type = NULL;
                break;
        }
 
@@ -1263,20 +1552,64 @@ enum bt_ctf_btr_status btr_signed_int_cb(int64_t value,
                goto end;
        }
 
-       ret = bt_ctf_field_signed_integer_set_value(int_field, value);
+       ret = bt_ctf_field_unsigned_integer_set_value(int_field, value);
        assert(!ret);
        stack_top(notit->stack)->index++;
+       *out_int_field = int_field;
 
 end:
        BT_PUT(field);
-       BT_PUT(int_field);
+       BT_PUT(type);
+end_no_put:
+       return status;
+}
+
+static
+enum bt_ctf_btr_status btr_timestamp_end_cb(void *value,
+               struct bt_ctf_field_type *type, void *data)
+{
+       enum bt_ctf_btr_status status;
+       struct bt_ctf_field *field = NULL;
+       struct bt_ctf_notif_iter *notit = data;
+
+       status = btr_unsigned_int_common(*((uint64_t *) value), type, data,
+                       &field);
 
+       /* Set as the current packet's timestamp_end field. */
+       BT_MOVE(notit->cur_timestamp_end, field);
        return status;
 }
 
 static
 enum bt_ctf_btr_status btr_unsigned_int_cb(uint64_t value,
                struct bt_ctf_field_type *type, void *data)
+{
+       struct bt_ctf_notif_iter *notit = data;
+       enum bt_ctf_btr_status status = BT_CTF_BTR_STATUS_OK;
+       struct bt_ctf_field *field = NULL;
+       struct field_cb_override *override;
+
+       override = g_hash_table_lookup(notit->field_overrides,
+                       type);
+       if (unlikely(override)) {
+               status = override->func(&value, type, override->data);
+               goto end;
+       }
+
+       status = btr_unsigned_int_common(value, type, data, &field);
+       if (status != BT_CTF_BTR_STATUS_OK) {
+               goto end;
+       }
+
+       status = update_clock(notit, field);
+       BT_PUT(field);
+end:
+       return status;
+}
+
+static
+enum bt_ctf_btr_status btr_signed_int_cb(int64_t value,
+               struct bt_ctf_field_type *type, void *data)
 {
        enum bt_ctf_btr_status status = BT_CTF_BTR_STATUS_OK;
        struct bt_ctf_field *field = NULL;
@@ -1284,23 +1617,27 @@ enum bt_ctf_btr_status btr_unsigned_int_cb(uint64_t value,
        struct bt_ctf_notif_iter *notit = data;
        int ret;
 
-       /* Create next field */
+       /* create next field */
        field = get_next_field(notit);
        if (!field) {
-               PERR("Failed to get next field (unsigned int)\n");
+               PERR("Failed to get next field (signed int)\n");
                status = BT_CTF_BTR_STATUS_ERROR;
-               goto end;
+               goto end_no_put;
        }
 
        switch(bt_ctf_field_type_get_type_id(type)) {
-       case BT_CTF_TYPE_ID_INTEGER:
+       case BT_CTF_FIELD_TYPE_ID_INTEGER:
                /* Integer field is created field */
                BT_MOVE(int_field, field);
+               bt_get(type);
                break;
-       case BT_CTF_TYPE_ID_ENUM:
+       case BT_CTF_FIELD_TYPE_ID_ENUM:
                int_field = bt_ctf_field_enumeration_get_container(field);
+               type = bt_ctf_field_get_type(int_field);
                break;
        default:
+               assert(0);
+               type = NULL;
                break;
        }
 
@@ -1310,14 +1647,15 @@ enum bt_ctf_btr_status btr_unsigned_int_cb(uint64_t value,
                goto end;
        }
 
-       ret = bt_ctf_field_unsigned_integer_set_value(int_field, value);
+       ret = bt_ctf_field_signed_integer_set_value(int_field, value);
        assert(!ret);
        stack_top(notit->stack)->index++;
-
+       status = update_clock(notit, int_field);
 end:
        BT_PUT(field);
        BT_PUT(int_field);
-
+       BT_PUT(type);
+end_no_put:
        return status;
 }
 
@@ -1377,6 +1715,18 @@ enum bt_ctf_btr_status btr_string_begin_cb(
                goto end;
        }
 
+       /*
+        * Initialize string field payload to an empty string since in the
+        * case of a length 0 string the btr_string_cb won't be called and
+        * we will end up with an unset string payload.
+        */
+       ret = bt_ctf_field_string_set_value(field, "");
+       if (ret) {
+               PERR("Failed to initialize string field\n");
+               status = BT_CTF_BTR_STATUS_ERROR;
+               goto end;
+       }
+
 end:
        BT_PUT(field);
 
@@ -1638,10 +1988,43 @@ struct bt_ctf_field_type *btr_get_variant_type_cb(
 end:
        BT_PUT(tag_field);
        BT_PUT(selected_field);
+       BT_PUT(path);
 
        return selected_field_type;
 }
 
+static
+int set_event_clocks(struct bt_ctf_event *event,
+               struct bt_ctf_notif_iter *notit)
+{
+       int ret;
+       GHashTableIter iter;
+       struct bt_ctf_clock_class *clock_class;
+       uint64_t *clock_state;
+
+       g_hash_table_iter_init(&iter, notit->clock_states);
+
+       while (g_hash_table_iter_next(&iter, (gpointer) &clock_class,
+                       (gpointer) &clock_state)) {
+               struct bt_ctf_clock_value *clock_value;
+
+               clock_value = bt_ctf_clock_value_create(clock_class,
+                       *clock_state);
+               if (!clock_value) {
+                       ret = -1;
+                       goto end;
+               }
+               ret = bt_ctf_event_set_clock_value(event, clock_value);
+               bt_put(clock_value);
+               if (ret) {
+                       goto end;
+               }
+       }
+       ret = 0;
+end:
+       return ret;
+}
+
 static
 struct bt_ctf_event *create_event(struct bt_ctf_notif_iter *notit)
 {
@@ -1673,12 +2056,17 @@ struct bt_ctf_event *create_event(struct bt_ctf_notif_iter *notit)
                goto error;
        }
 
-       ret = bt_ctf_event_set_payload_field(event,
+       ret = bt_ctf_event_set_event_payload(event,
                notit->dscopes.event_payload);
        if (ret) {
                goto error;
        }
 
+       ret = set_event_clocks(event, notit);
+       if (ret) {
+               goto error;
+       }
+
        /* Associate with current packet. */
        assert(notit->packet);
        ret = bt_ctf_event_set_packet(event, notit->packet);
@@ -1749,7 +2137,7 @@ void notify_new_packet(struct bt_ctf_notif_iter *notit,
                return;
        }
 
-       ret = bt_notification_packet_start_create(notit->packet);
+       ret = bt_notification_packet_begin_create(notit->packet);
        if (!ret) {
                return;
        }
@@ -1776,6 +2164,7 @@ void notify_end_of_packet(struct bt_ctf_notif_iter *notit,
 
 static
 void notify_event(struct bt_ctf_notif_iter *notit,
+               struct bt_clock_class_priority_map *cc_prio_map,
                struct bt_notification **notification)
 {
        struct bt_ctf_event *event;
@@ -1787,7 +2176,7 @@ void notify_event(struct bt_ctf_notif_iter *notit,
                goto end;
        }
 
-       ret = bt_notification_event_create(event);
+       ret = bt_notification_event_create(event, cc_prio_map);
        if (!ret) {
                goto end;
        }
@@ -1797,32 +2186,85 @@ end:
 }
 
 static
-void notify_eos(struct bt_ctf_notif_iter *notit,
-               struct bt_notification **notification)
+int init_clock_states(GHashTable *clock_states, struct bt_ctf_trace *trace)
 {
-       struct bt_ctf_event *event;
-       struct bt_notification *ret = NULL;
+       int clock_class_count, i, ret = 0;
 
-       /* Create event */
-       event = create_event(notit);
-       if (!event) {
+       clock_class_count = bt_ctf_trace_get_clock_class_count(trace);
+       if (clock_class_count <= 0) {
+               ret = -1;
                goto end;
        }
 
-       ret = bt_notification_stream_end_create(event);
-       if (!ret) {
+       for (i = 0; i < clock_class_count; i++) {
+               struct bt_ctf_clock_class *clock_class;
+
+               clock_class = bt_ctf_trace_get_clock_class_by_index(trace, i);
+               if (!clock_class) {
+                       ret = -1;
+                       goto end;
+               }
+
+               g_hash_table_insert(clock_states, bt_get(clock_class), NULL);
+               bt_put(clock_class);
+       }
+end:
+       return ret;
+}
+
+static
+void init_trace_field_path_cache(struct bt_ctf_trace *trace,
+               struct trace_field_path_cache *trace_field_path_cache)
+{
+       int stream_id = -1;
+       int stream_instance_id = -1;
+       int i, count;
+       struct bt_ctf_field_type *packet_header = NULL;
+
+       packet_header = bt_ctf_trace_get_packet_header_type(trace);
+       if (!packet_header) {
                goto end;
        }
-       *notification = ret;
+
+       if (!bt_ctf_field_type_is_structure(packet_header)) {
+               goto end;
+       }
+
+       count = bt_ctf_field_type_structure_get_field_count(packet_header);
+       if (count < 0) {
+               goto end;
+       }
+
+       for (i = 0; (i < count && (stream_id == -1 || stream_instance_id == -1)); i++) {
+               int ret;
+               const char *field_name;
+
+               ret = bt_ctf_field_type_structure_get_field(packet_header,
+                               &field_name, NULL, i);
+               if (ret) {
+                       goto end;
+               }
+
+               if (stream_id == -1 && !strcmp(field_name, "stream_id")) {
+                       stream_id = i;
+               } else if (stream_instance_id == -1 &&
+                               !strcmp(field_name, "stream_instance_id")) {
+                       stream_instance_id = i;
+               }
+       }
 end:
-       BT_PUT(event);
+       trace_field_path_cache->stream_id = stream_id;
+       trace_field_path_cache->stream_instance_id = stream_instance_id;
+       BT_PUT(packet_header);
 }
 
+BT_HIDDEN
 struct bt_ctf_notif_iter *bt_ctf_notif_iter_create(struct bt_ctf_trace *trace,
                size_t max_request_sz,
                struct bt_ctf_notif_iter_medium_ops medops,
                void *data, FILE *err_stream)
 {
+       int ret;
        struct bt_ctf_notif_iter *notit = NULL;
        struct bt_ctf_btr_cbs cbs = {
                .types = {
@@ -1843,14 +2285,24 @@ struct bt_ctf_notif_iter *bt_ctf_notif_iter_create(struct bt_ctf_trace *trace,
 
        assert(trace);
        assert(medops.request_bytes);
+       assert(medops.get_stream);
        notit = g_new0(struct bt_ctf_notif_iter, 1);
        if (!notit) {
                PERR("Failed to allocate memory for CTF notification iterator\n");
                goto end;
        }
-
-       notit->meta.trace = trace;
-       bt_get(notit->meta.trace);
+       notit->clock_states = g_hash_table_new_full(g_direct_hash,
+                       g_direct_equal, bt_put, g_free);
+       if (!notit->clock_states) {
+               PERR("Failed to create hash table\n");
+               goto error;
+       }
+       ret = init_clock_states(notit->clock_states, trace);
+       if (ret) {
+               PERR("Failed to initialize stream clock states\n");
+               goto error;
+       }
+       notit->meta.trace = bt_get(trace);
        notit->medium.medops = medops;
        notit->medium.max_request_sz = max_request_sz;
        notit->medium.data = data;
@@ -1858,23 +2310,37 @@ struct bt_ctf_notif_iter *bt_ctf_notif_iter_create(struct bt_ctf_trace *trace,
        notit->stack = stack_new(notit);
        if (!notit->stack) {
                PERR("Failed to create stack\n");
-               bt_ctf_notif_iter_destroy(notit);
-               notit = NULL;
-               goto end;
+               goto error;
        }
 
        notit->btr = bt_ctf_btr_create(cbs, notit, err_stream);
        if (!notit->btr) {
                PERR("Failed to create binary type reader\n");
-               bt_ctf_notif_iter_destroy(notit);
-               notit = NULL;
-               goto end;
+               goto error;
        }
 
        bt_ctf_notif_iter_reset(notit);
 
+       init_trace_field_path_cache(trace, &notit->trace_field_path_cache);
+       notit->sc_field_path_caches = g_hash_table_new_full(g_direct_hash,
+                       g_direct_equal, bt_put, g_free);
+       if (!notit->sc_field_path_caches) {
+               PERR("Failed to create stream class field path caches\n");
+               goto error;
+       }
+
+       notit->field_overrides = g_hash_table_new_full(g_direct_hash,
+                       g_direct_equal, bt_put, g_free);
+       if (!notit->field_overrides) {
+               goto error;
+       }
+
 end:
        return notit;
+error:
+       bt_ctf_notif_iter_destroy(notit);
+       notit = NULL;
+       goto end;
 }
 
 void bt_ctf_notif_iter_destroy(struct bt_ctf_notif_iter *notit)
@@ -1883,6 +2349,7 @@ void bt_ctf_notif_iter_destroy(struct bt_ctf_notif_iter *notit)
        BT_PUT(notit->meta.stream_class);
        BT_PUT(notit->meta.event_class);
        BT_PUT(notit->packet);
+       BT_PUT(notit->cur_timestamp_end);
        put_all_dscopes(notit);
 
        if (notit->stack) {
@@ -1893,11 +2360,23 @@ void bt_ctf_notif_iter_destroy(struct bt_ctf_notif_iter *notit)
                bt_ctf_btr_destroy(notit->btr);
        }
 
+       if (notit->clock_states) {
+               g_hash_table_destroy(notit->clock_states);
+       }
+
+       if (notit->sc_field_path_caches) {
+               g_hash_table_destroy(notit->sc_field_path_caches);
+       }
+
+       if (notit->field_overrides) {
+               g_hash_table_destroy(notit->field_overrides);
+       }
        g_free(notit);
 }
 
 enum bt_ctf_notif_iter_status bt_ctf_notif_iter_get_next_notification(
                struct bt_ctf_notif_iter *notit,
+               struct bt_clock_class_priority_map *cc_prio_map,
                struct bt_notification **notification)
 {
        enum bt_ctf_notif_iter_status status = BT_CTF_NOTIF_ITER_STATUS_OK;
@@ -1907,6 +2386,10 @@ enum bt_ctf_notif_iter_status bt_ctf_notif_iter_get_next_notification(
 
        while (true) {
                status = handle_state(notit);
+               if (status == BT_CTF_NOTIF_ITER_STATUS_AGAIN) {
+                       PDBG("Medium operation reported \"try again later\"");
+                       goto end;
+               }
                if (status != BT_CTF_NOTIF_ITER_STATUS_OK) {
                        if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
                                PDBG("Medium operation reported end of stream\n");
@@ -1927,12 +2410,28 @@ enum bt_ctf_notif_iter_status bt_ctf_notif_iter_get_next_notification(
                        goto end;
                case STATE_EMIT_NOTIF_EVENT:
                        PDBG("Emitting event notification\n");
-                       notify_event(notit, notification);
+                       notify_event(notit, cc_prio_map, notification);
                        if (!*notification) {
                                status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
                        }
                        goto end;
                case STATE_EMIT_NOTIF_END_OF_PACKET:
+                       /* Update clock with timestamp_end field. */
+                       if (notit->cur_timestamp_end) {
+                               enum bt_ctf_btr_status btr_status;
+                               struct bt_ctf_field_type *field_type =
+                                               bt_ctf_field_get_type(
+                                                       notit->cur_timestamp_end);
+
+                               btr_status = update_clock(notit,
+                                       notit->cur_timestamp_end);
+                               BT_PUT(field_type);
+                               if (btr_status != BT_CTF_BTR_STATUS_OK) {
+                                       status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+                                       goto end;
+                               }
+                       }
+
                        PDBG("Emitting end of packet notification\n");
                        notify_end_of_packet(notit, notification);
                        if (!*notification) {
This page took 0.035586 seconds and 4 git commands to generate.