X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fiterator.c;h=849543ce7ea26f7bb695fc7ac99486a813625a5d;hb=126215b484c9df27d192690e0dba2fb9f630e10a;hp=87fe6376309de579986cc9422e12cf66a0c3f8fc;hpb=1a6a376a7c1fff0d5da2559f4f9a515950fb15ba;p=babeltrace.git diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 87fe6376..849543ce 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -30,6 +30,9 @@ #include #include +#include +#include +#include #include #include #include @@ -48,14 +51,23 @@ #include #include #include +#include #include #include #include +#include #include +struct discarded_elements_state { + struct bt_ctf_clock_value *cur_begin; + uint64_t cur_count; +}; + struct stream_state { struct bt_ctf_stream *stream; /* owned by this */ struct bt_ctf_packet *cur_packet; /* owned by this */ + struct discarded_elements_state discarded_packets_state; + struct discarded_elements_state discarded_events_state; bt_bool is_ended; }; @@ -65,6 +77,8 @@ enum action_type { ACTION_TYPE_ADD_STREAM_STATE, ACTION_TYPE_SET_STREAM_STATE_IS_ENDED, ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET, + ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS, + ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS, }; struct action { @@ -98,6 +112,14 @@ struct action { struct stream_state *stream_state; /* weak */ struct bt_ctf_packet *packet; /* owned by this */ } set_stream_state_cur_packet; + + /* ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS */ + /* ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS */ + struct { + struct stream_state *stream_state; /* weak */ + struct bt_ctf_clock_value *cur_begin; /* owned by this */ + uint64_t cur_count; + } update_stream_state_discarded_elements; } payload; }; @@ -122,6 +144,8 @@ void destroy_stream_state(struct stream_state *stream_state) bt_put(stream_state->cur_packet); BT_LOGV_STR("Putting stream state's stream."); bt_put(stream_state->stream); + bt_put(stream_state->discarded_packets_state.cur_begin); + bt_put(stream_state->discarded_events_state.cur_begin); g_free(stream_state); } @@ -150,7 +174,12 @@ void destroy_action(struct action *action) break; case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED: break; + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS: + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS: + BT_PUT(action->payload.update_stream_state_discarded_elements.cur_begin); + break; default: + BT_LOGF("Unexpected action's type: type=%d", action->type); abort(); } } @@ -191,6 +220,10 @@ const char *action_type_string(enum action_type type) return "ACTION_TYPE_SET_STREAM_STATE_IS_ENDED"; case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET: return "ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET"; + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS: + return "ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS"; + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS: + return "ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS"; default: return "(unknown)"; } @@ -262,7 +295,25 @@ void apply_actions(struct bt_notification_iterator *iterator) BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet, action->payload.set_stream_state_cur_packet.packet); break; + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS: + case ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS: + { + struct discarded_elements_state *state; + + if (action->type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS) { + state = &action->payload.update_stream_state_discarded_elements.stream_state->discarded_packets_state; + } else { + state = &action->payload.update_stream_state_discarded_elements.stream_state->discarded_events_state; + } + + BT_MOVE(state->cur_begin, + action->payload.update_stream_state_discarded_elements.cur_begin); + state->cur_count = action->payload.update_stream_state_discarded_elements.cur_count; + break; + } default: + BT_LOGF("Unexpected action's type: type=%d", + action->type); abort(); } } @@ -280,6 +331,15 @@ struct stream_state *create_stream_state(struct bt_ctf_stream *stream) goto end; } + /* + * The packet index is a monotonic counter which may not start + * at 0 at the beginning of the stream. We therefore need to + * have an internal object initial state of -1ULL to distinguish + * between initial state and having seen a packet with + * seqnum = 0. + */ + stream_state->discarded_packets_state.cur_count = -1ULL; + /* * We keep a reference to the stream until we know it's ended * because we need to be able to create an automatic "stream @@ -478,7 +538,9 @@ int create_subscription_mask_from_notification_types( BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_BEGIN | BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_STREAM_END | BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_BEGIN | - BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END; + BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END | + BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS | + BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS; break; case BT_NOTIFICATION_TYPE_EVENT: iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_EVENT; @@ -498,6 +560,12 @@ int create_subscription_mask_from_notification_types( case BT_NOTIFICATION_TYPE_PACKET_END: iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END; break; + case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: + iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS; + break; + case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: + iterator->subscription_mask |= BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS; + break; default: ret = -1; goto end; @@ -674,6 +742,12 @@ bt_notification_iterator_notif_type_from_notif_type( case BT_NOTIFICATION_TYPE_PACKET_END: iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_PACKET_END; break; + case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: + iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_EVENTS; + break; + case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: + iter_notif_type = BT_NOTIFICATION_ITERATOR_NOTIF_TYPE_DISCARDED_PACKETS; + break; default: abort(); } @@ -1015,6 +1089,38 @@ void add_action_set_stream_state_cur_packet( stream_state, packet); } +static +void add_action_update_stream_state_discarded_elements( + struct bt_notification_iterator *iterator, + enum action_type type, + struct stream_state *stream_state, + struct bt_ctf_clock_value *cur_begin, + uint64_t cur_count) +{ + struct action action = { + .type = type, + .payload.update_stream_state_discarded_elements = { + .stream_state = stream_state, + .cur_begin = bt_get(cur_begin), + .cur_count = cur_count, + }, + }; + + assert(stream_state); + assert(type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS || + type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS); + add_action(iterator, &action); + if (type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS) { + BT_LOGV("Added \"update stream state's discarded packets\" action: " + "stream-state-addr=%p, cur-begin-addr=%p, cur-count=%" PRIu64, + stream_state, cur_begin, cur_count); + } else if (type == ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS) { + BT_LOGV("Added \"update stream state's discarded events\" action: " + "stream-state-addr=%p, cur-begin-addr=%p, cur-count=%" PRIu64, + stream_state, cur_begin, cur_count); + } +} + static int ensure_stream_state_exists(struct bt_notification_iterator *iterator, struct bt_notification *stream_begin_notif, @@ -1079,6 +1185,392 @@ end: return ret; } +static +struct bt_ctf_field *get_struct_field_uint(struct bt_ctf_field *struct_field, + const char *field_name) +{ + struct bt_ctf_field *field = NULL; + struct bt_ctf_field_type *ft = NULL; + + field = bt_ctf_field_structure_get_field_by_name(struct_field, + field_name); + if (!field) { + BT_LOGV_STR("`%s` field does not exist."); + goto end; + } + + if (!bt_ctf_field_is_integer(field)) { + BT_LOGV("Skipping `%s` field because its type is not an integer field type: " + "field-addr=%p, ft-addr=%p, ft-id=%s", field_name, + field, ft, bt_ctf_field_type_id_string( + bt_ctf_field_type_get_type_id(ft))); + BT_PUT(field); + goto end; + } + + ft = bt_ctf_field_get_type(field); + assert(ft); + + if (bt_ctf_field_type_integer_is_signed(ft)) { + BT_LOGV("Skipping `%s` integer field because its type is signed: " + "field-addr=%p, ft-addr=%p", field_name, field, ft); + BT_PUT(field); + goto end; + } + +end: + bt_put(ft); + return field; +} + +static +uint64_t get_packet_context_events_discarded(struct bt_ctf_packet *packet) +{ + struct bt_ctf_field *packet_context = NULL; + struct bt_ctf_field *field = NULL; + uint64_t retval = -1ULL; + int ret; + + packet_context = bt_ctf_packet_get_context(packet); + if (!packet_context) { + goto end; + } + + field = get_struct_field_uint(packet_context, "events_discarded"); + if (!field) { + BT_LOGV("`events_discarded` field does not exist in packet's context field: " + "packet-addr=%p, packet-context-field-addr=%p", + packet, packet_context); + goto end; + } + + assert(bt_ctf_field_is_integer(field)); + ret = bt_ctf_field_unsigned_integer_get_value(field, &retval); + if (ret) { + BT_LOGV("Cannot get raw value of packet's context field's `events_discarded` integer field: " + "packet-addr=%p, field-addr=%p", + packet, field); + retval = -1ULL; + goto end; + } + +end: + bt_put(packet_context); + bt_put(field); + return retval; +} + +static +uint64_t get_packet_context_packet_seq_num(struct bt_ctf_packet *packet) +{ + struct bt_ctf_field *packet_context = NULL; + struct bt_ctf_field *field = NULL; + uint64_t retval = -1ULL; + int ret; + + packet_context = bt_ctf_packet_get_context(packet); + if (!packet_context) { + goto end; + } + + field = get_struct_field_uint(packet_context, "packet_seq_num"); + if (!field) { + BT_LOGV("`packet_seq_num` field does not exist in packet's context field: " + "packet-addr=%p, packet-context-field-addr=%p", + packet, packet_context); + goto end; + } + + assert(bt_ctf_field_is_integer(field)); + ret = bt_ctf_field_unsigned_integer_get_value(field, &retval); + if (ret) { + BT_LOGV("Cannot get raw value of packet's context field's `packet_seq_num` integer field: " + "packet-addr=%p, field-addr=%p", + packet, field); + retval = -1ULL; + goto end; + } + +end: + bt_put(packet_context); + bt_put(field); + return retval; +} + +static +int handle_discarded_packets(struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet, + struct bt_ctf_clock_value *ts_begin, + struct bt_ctf_clock_value *ts_end, + struct stream_state *stream_state) +{ + struct bt_notification *notif = NULL; + uint64_t diff; + uint64_t prev_count, next_count; + int ret = 0; + + next_count = get_packet_context_packet_seq_num(packet); + if (next_count == -1ULL) { + /* + * Stream does not have seqnum field, skip discarded + * packets feature. + */ + goto end; + } + prev_count = stream_state->discarded_packets_state.cur_count; + + if (prev_count != -1ULL) { + if (next_count < prev_count) { + BT_LOGW("Current value of packet's context field's `packet_seq_num` field is lesser than the previous value for the same stream: " + "not updating the stream state's current value: " + "packet-addr=%p, prev-count=%" PRIu64 ", " + "cur-count=%" PRIu64, + packet, prev_count, next_count); + goto end; + } + if (next_count == prev_count) { + BT_LOGW("Current value of packet's context field's `packet_seq_num` field is equal to the previous value for the same stream: " + "not updating the stream state's current value: " + "packet-addr=%p, prev-count=%" PRIu64 ", " + "cur-count=%" PRIu64, + packet, prev_count, next_count); + goto end; + } + + diff = next_count - prev_count; + if (diff > 1) { + /* + * Add a discarded packets notification. The packets + * are considered to be lost between the state's last time + * and the current packet's beginning time. + * The counter is expected to monotonically increase of + * 1 for each packet. Therefore, the number of missing + * packets is 'diff - 1'. + */ + notif = bt_notification_discarded_elements_create( + BT_NOTIFICATION_TYPE_DISCARDED_PACKETS, + stream_state->stream, + stream_state->discarded_packets_state.cur_begin, + ts_begin, diff - 1); + if (!notif) { + BT_LOGE_STR("Cannot create discarded packets notification."); + ret = -1; + goto end; + } + + add_action_push_notif(iterator, notif); + } + } + + add_action_update_stream_state_discarded_elements(iterator, + ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_PACKETS, + stream_state, ts_end, next_count); + +end: + bt_put(notif); + return ret; +} + +static +int handle_discarded_events(struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet, + struct bt_ctf_clock_value *ts_begin, + struct bt_ctf_clock_value *ts_end, + struct stream_state *stream_state) +{ + struct bt_notification *notif = NULL; + uint64_t diff; + uint64_t next_count; + int ret = 0; + + next_count = get_packet_context_events_discarded(packet); + if (next_count == -1ULL) { + next_count = stream_state->discarded_events_state.cur_count; + goto update_state; + } + + if (next_count < stream_state->discarded_events_state.cur_count) { + BT_LOGW("Current value of packet's context field's `events_discarded` field is lesser than the previous value for the same stream: " + "not updating the stream state's current value: " + "packet-addr=%p, prev-count=%" PRIu64 ", " + "cur-count=%" PRIu64, + packet, stream_state->discarded_events_state.cur_count, + next_count); + goto end; + } + + diff = next_count - stream_state->discarded_events_state.cur_count; + if (diff > 0) { + /* + * Add a discarded events notification. The events are + * considered to be lost betweem the state's last time + * and the current packet's end time. + */ + notif = bt_notification_discarded_elements_create( + BT_NOTIFICATION_TYPE_DISCARDED_EVENTS, + stream_state->stream, + stream_state->discarded_events_state.cur_begin, + ts_end, diff); + if (!notif) { + BT_LOGE_STR("Cannot create discarded events notification."); + ret = -1; + goto end; + } + + add_action_push_notif(iterator, notif); + } + +update_state: + add_action_update_stream_state_discarded_elements(iterator, + ACTION_TYPE_UPDATE_STREAM_STATE_DISCARDED_EVENTS, + stream_state, ts_end, next_count); + +end: + bt_put(notif); + return ret; +} + +static +int get_field_clock_value(struct bt_ctf_field *root_field, + const char *field_name, + struct bt_ctf_clock_value **user_clock_val) +{ + struct bt_ctf_field *field; + struct bt_ctf_field_type *ft = NULL; + struct bt_ctf_clock_class *clock_class = NULL; + struct bt_ctf_clock_value *clock_value = NULL; + uint64_t val; + int ret = 0; + + field = get_struct_field_uint(root_field, field_name); + if (!field) { + /* Not an error: skip this */ + goto end; + } + + ft = bt_ctf_field_get_type(field); + assert(ft); + clock_class = bt_ctf_field_type_integer_get_mapped_clock_class(ft); + if (!clock_class) { + BT_LOGW("Integer field type has no mapped clock class but it's expected to have one: " + "ft-addr=%p", ft); + ret = -1; + goto end; + } + + ret = bt_ctf_field_unsigned_integer_get_value(field, &val); + if (ret) { + BT_LOGW("Cannot get integer field's raw value: " + "field-addr=%p", field); + ret = -1; + goto end; + } + + clock_value = bt_ctf_clock_value_create(clock_class, val); + if (!clock_value) { + BT_LOGE_STR("Cannot create clock value from clock class."); + ret = -1; + goto end; + } + + /* Move clock value to user */ + *user_clock_val = clock_value; + clock_value = NULL; + +end: + bt_put(field); + bt_put(ft); + bt_put(clock_class); + bt_put(clock_value); + return ret; +} + +static +int get_ts_begin_ts_end_from_packet(struct bt_ctf_packet *packet, + struct bt_ctf_clock_value **user_ts_begin, + struct bt_ctf_clock_value **user_ts_end) +{ + struct bt_ctf_field *packet_context = NULL; + struct bt_ctf_clock_value *ts_begin = NULL; + struct bt_ctf_clock_value *ts_end = NULL; + int ret = 0; + + packet_context = bt_ctf_packet_get_context(packet); + if (!packet_context) { + goto end; + } + + ret = get_field_clock_value(packet_context, "timestamp_begin", + &ts_begin); + if (ret) { + BT_LOGW("Cannot create clock value for packet context's `timestamp_begin` field: " + "packet-addr=%p, packet-context-field-addr=%p", + packet, packet_context); + goto end; + } + + ret = get_field_clock_value(packet_context, "timestamp_end", + &ts_end); + if (ret) { + BT_LOGW("Cannot create clock value for packet context's `timestamp_begin` field: " + "packet-addr=%p, packet-context-field-addr=%p", + packet, packet_context); + goto end; + } + + /* Move clock values to user */ + *user_ts_begin = ts_begin; + ts_begin = NULL; + *user_ts_end = ts_end; + ts_end = NULL; + +end: + bt_put(packet_context); + bt_put(ts_begin); + bt_put(ts_end); + return ret; +} + +static +int handle_discarded_elements(struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet, struct stream_state *stream_state) +{ + struct bt_ctf_clock_value *ts_begin = NULL; + struct bt_ctf_clock_value *ts_end = NULL; + int ret; + + ret = get_ts_begin_ts_end_from_packet(packet, &ts_begin, &ts_end); + if (ret) { + BT_LOGW("Cannot get packet's beginning or end clock values: " + "packet-addr=%p, ret=%d", packet, ret); + ret = -1; + goto end; + } + + ret = handle_discarded_packets(iterator, packet, ts_begin, ts_end, + stream_state); + if (ret) { + BT_LOGW("Cannot handle discarded packets for packet: " + "packet-addr=%p, ret=%d", packet, ret); + ret = -1; + goto end; + } + + ret = handle_discarded_events(iterator, packet, ts_begin, ts_end, + stream_state); + if (ret) { + BT_LOGW("Cannot handle discarded events for packet: " + "packet-addr=%p, ret=%d", packet, ret); + ret = -1; + goto end; + } + +end: + bt_put(ts_begin); + bt_put(ts_end); + return ret; +} + static int handle_packet_switch(struct bt_notification_iterator *iterator, struct bt_notification *packet_begin_notif, @@ -1105,6 +1597,16 @@ int handle_packet_switch(struct bt_notification_iterator *iterator, } } + /* + * Check the new packet's context fields for discarded packets + * and events to emit those automatic notifications. + */ + ret = handle_discarded_elements(iterator, new_packet, stream_state); + if (ret) { + BT_LOGE_STR("Cannot handle discarded elements for new packet."); + goto error; + } + /* Beginning of the new packet */ if (packet_begin_notif) { add_action_push_notif(iterator, packet_begin_notif); @@ -1190,6 +1692,35 @@ end: return ret; } +static +int handle_notif_discarded_elements( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_DISCARDED_EVENTS || + notif->type == BT_NOTIFICATION_TYPE_DISCARDED_PACKETS); + assert(notif_stream); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + BT_LOGE_STR("Cannot ensure that stream state exists."); + goto error; + } + + add_action_push_notif(iterator, notif); + goto end; + +error: + ret = -1; + +end: + return ret; +} + static int handle_notif_packet_begin( struct bt_notification_iterator *iterator, @@ -1349,7 +1880,10 @@ int enqueue_notification_and_automatic( * types above are allowed to be returned by a user * component. */ - goto error; + BT_LOGF("Unexpected notification type at this point: " + "notif-addr=%p, notif-type=%s", notif, + bt_notification_type_string(notif->type)); + abort(); } if (notif_packet) { @@ -1392,6 +1926,11 @@ handle_notif: ret = handle_notif_packet_end(iterator, notif, notif_stream, notif_packet); break; + case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: + case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: + ret = handle_notif_discarded_elements(iterator, notif, + notif_stream); + break; case BT_NOTIFICATION_TYPE_INACTIVITY: add_action_push_notif(iterator, notif); break; @@ -1616,6 +2155,23 @@ enum bt_notification_iterator_status ensure_queue_has_notifications( goto end; } + /* + * Ignore some notifications which are always + * automatically generated by the notification + * iterator to make sure they have valid values. + */ + switch (next_return.notification->type) { + case BT_NOTIFICATION_TYPE_DISCARDED_PACKETS: + case BT_NOTIFICATION_TYPE_DISCARDED_EVENTS: + BT_LOGV("Ignoring discarded elements notification returned by notification iterator's \"next\" method: " + "notif-type=%s", + bt_notification_type_string(next_return.notification->type)); + BT_PUT(next_return.notification); + continue; + default: + break; + } + /* * We know the notification is valid. Before we * push it to the head of the queue, push the