#include <babeltrace/ctf-ir/stream.h>
#include <babeltrace/ctf-ir/clock.h>
#include <babeltrace/ctf-ir/event-class.h>
+#include <babeltrace/plugin/notification/packet.h>
+#include <babeltrace/plugin/notification/event.h>
#include <babeltrace/ref.h>
#include <glib.h>
struct stack {
/* Entries (struct stack_entry *) (top is last element) */
GPtrArray *entries;
-
- /* Link to owner */
- struct bt_ctf_notif_iter *notit;
};
/* State */
/* Current state */
enum state state;
- /* User buffer stuff */
+ /* Current medium buffer data */
struct {
/* Last address provided by medium */
const uint8_t *addr;
/* Binary type reader */
struct bt_ctf_btr *btr;
- /* Medium stuff */
+ /* Current medium data */
struct {
struct bt_ctf_notif_iter_medium_ops medops;
size_t max_request_sz;
} medium;
/* Current packet size (bits) (-1 if unknown) */
- size_t cur_packet_size;
+ int64_t cur_packet_size;
/* Current content size (bits) (-1 if unknown) */
- size_t cur_content_size;
+ int64_t cur_content_size;
};
+static
+int bt_ctf_notif_iter_switch_packet(struct bt_ctf_notif_iter *notit);
+
static
void stack_entry_free_func(gpointer data)
{
goto error;
}
- stack->notit = notit;
-
return stack;
-
error:
g_free(stack);
-
return NULL;
}
{
int ret = 0;
struct stack_entry *entry;
- struct bt_ctf_notif_iter *notit;
assert(stack);
assert(base);
- notit = stack->notit;
entry = g_new0(struct stack_entry, 1);
if (!entry) {
- PERR("Cannot create new stack entry\n");
ret = -1;
goto end;
}
}
static
-enum bt_ctf_notif_iter_status request_medium_bytes(struct bt_ctf_notif_iter *notit)
+enum bt_ctf_notif_iter_status request_medium_bytes(
+ struct bt_ctf_notif_iter *notit)
{
uint8_t *buffer_addr;
size_t buffer_sz;
}
consumed_bits = bt_ctf_btr_continue(notit->btr, notit->buf.addr,
- notit->buf.sz, &btr_status);
+ notit->buf.sz, &btr_status);
switch (btr_status) {
case BT_CTF_BTR_STATUS_OK:
- /* Type was read completely */
+ /* Type was read completely. */
notit->state = done_state;
break;
case BT_CTF_BTR_STATUS_EOF:
- /* Stay in this continue state */
+ /* Stay in this continue state. */
break;
default:
PERR("Binary type reader failed to continue\n");
goto end;
}
- /* Consume bits now since we know we're not in an error state */
+ /* Consume bits now since we know we're not in an error state. */
buf_consume_bits(notit, consumed_bits);
-
end:
return status;
}
enum bt_ctf_notif_iter_status read_packet_header_begin_state(
struct bt_ctf_notif_iter *notit)
{
- enum bt_ctf_notif_iter_status status = BT_CTF_NOTIF_ITER_STATUS_OK;
- struct bt_ctf_field_type *packet_header_type;
+ struct bt_ctf_field_type *packet_header_type = NULL;
+ enum bt_ctf_notif_iter_status ret = BT_CTF_NOTIF_ITER_STATUS_OK;
- /* Reset all dynamic scopes since we're reading a new packet */
- put_all_dscopes(notit);
- BT_PUT(notit->packet);
- BT_PUT(notit->meta.stream_class);
- BT_PUT(notit->meta.event_class);
+ if (bt_ctf_notif_iter_switch_packet(notit)) {
+ ret = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+ goto end;
+ }
- /* Packet header type is common to the whole trace */
+ /* Packet header type is common to the whole trace. */
packet_header_type = bt_ctf_trace_get_packet_header_type(
- notit->meta.trace);
+ notit->meta.trace);
if (!packet_header_type) {
PERR("Failed to retrieve trace's packet header type\n");
- status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+ ret = BT_CTF_NOTIF_ITER_STATUS_ERROR;
goto end;
}
- status = read_dscope_begin_state(notit, packet_header_type,
- STATE_AFTER_TRACE_PACKET_HEADER,
- STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE,
- ¬it->dscopes.trace_packet_header);
-
+ ret = read_dscope_begin_state(notit, packet_header_type,
+ STATE_AFTER_TRACE_PACKET_HEADER,
+ STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE,
+ ¬it->dscopes.trace_packet_header);
end:
BT_PUT(packet_header_type);
-
- return status;
+ return ret;
}
static
struct bt_ctf_notif_iter *notit)
{
return read_dscope_continue_state(notit,
- STATE_AFTER_TRACE_PACKET_HEADER);
+ STATE_AFTER_TRACE_PACKET_HEADER);
}
static inline
bool is_struct_type(struct bt_ctf_field_type *field_type)
{
- return bt_ctf_field_type_get_type_id(field_type) == BT_CTF_TYPE_ID_STRUCT;
+ return bt_ctf_field_type_get_type_id(field_type) ==
+ BT_CTF_TYPE_ID_STRUCT;
}
static inline
bool is_variant_type(struct bt_ctf_field_type *field_type)
{
- return bt_ctf_field_type_get_type_id(field_type) == BT_CTF_TYPE_ID_VARIANT;
+ return bt_ctf_field_type_get_type_id(field_type) ==
+ BT_CTF_TYPE_ID_VARIANT;
}
static inline
/* Is there any "stream_id" field in the packet header? */
packet_header_type = bt_ctf_trace_get_packet_header_type(
- notit->meta.trace);
+ notit->meta.trace);
if (!packet_header_type) {
PERR("Failed to retrieve trace's packet header type\n");
status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
// TODO: optimalize!
stream_id_field_type =
- bt_ctf_field_type_structure_get_field_type_by_name(
- packet_header_type, "stream_id");
+ bt_ctf_field_type_structure_get_field_type_by_name(
+ packet_header_type, "stream_id");
if (stream_id_field_type) {
/* Find appropriate stream class using current stream ID */
struct bt_ctf_field *stream_id_field = NULL;
// TODO: optimalize!
stream_id_field = bt_ctf_field_structure_get_field(
- notit->dscopes.trace_packet_header, "stream_id");
+ notit->dscopes.trace_packet_header, "stream_id");
assert(stream_id_field);
ret = bt_ctf_field_unsigned_integer_get_value(
- stream_id_field, &stream_id);
+ stream_id_field, &stream_id);
assert(!ret);
BT_PUT(stream_id_field);
} else {
/* Only one stream: pick the first stream class */
assert(bt_ctf_trace_get_stream_class_count(
- notit->meta.trace) == 1);
+ notit->meta.trace) == 1);
stream_id = 0;
}
// TODO: get by ID
notit->meta.stream_class = bt_ctf_trace_get_stream_class(
- notit->meta.trace, stream_id);
+ notit->meta.trace, stream_id);
if (!notit->meta.stream_class) {
PERR("Cannot find stream class with ID %" PRIu64 "\n",
stream_id);
assert(notit->meta.stream_class);
packet_context_type = bt_ctf_stream_class_get_packet_context_type(
- notit->meta.stream_class);
+ notit->meta.stream_class);
if (!packet_context_type) {
PERR("Failed to retrieve stream class's packet context\n");
status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
}
status = read_dscope_begin_state(notit, packet_context_type,
- STATE_AFTER_STREAM_PACKET_CONTEXT,
- STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
- ¬it->dscopes.stream_packet_context);
+ STATE_AFTER_STREAM_PACKET_CONTEXT,
+ STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
+ ¬it->dscopes.stream_packet_context);
end:
BT_PUT(packet_context_type);
-
return status;
}
struct bt_ctf_notif_iter *notit)
{
return read_dscope_continue_state(notit,
- STATE_AFTER_STREAM_PACKET_CONTEXT);
+ STATE_AFTER_STREAM_PACKET_CONTEXT);
}
-static inline
+static
enum bt_ctf_notif_iter_status set_current_packet_content_sizes(
struct bt_ctf_notif_iter *notit)
{
assert(notit->dscopes.stream_packet_context);
- // TODO: optimalize!
packet_size_field = bt_ctf_field_structure_get_field(
- notit->dscopes.stream_packet_context, "packet_size");
+ notit->dscopes.stream_packet_context, "packet_size");
content_size_field = bt_ctf_field_structure_get_field(
- notit->dscopes.stream_packet_context, "content_size");
+ notit->dscopes.stream_packet_context, "content_size");
if (packet_size_field) {
int ret = bt_ctf_field_unsigned_integer_get_value(
- packet_size_field, &packet_size);
- assert(!ret);
+ packet_size_field, &packet_size);
+ assert(!ret);
if (packet_size == 0) {
PERR("Decoded packet size is 0\n");
status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
goto end;
}
}
+
if (content_size_field) {
int ret = bt_ctf_field_unsigned_integer_get_value(
content_size_field, &content_size);
+
assert(!ret);
} else {
content_size = packet_size;
notit->cur_packet_size = packet_size;
notit->cur_content_size = content_size;
-
end:
BT_PUT(packet_size_field);
BT_PUT(content_size_field);
-
return status;
}
} else if (packet_at(notit) > notit->cur_content_size) {
/* That's not supposed to happen */
PERR("Cursor passed packet's content size:\n");
- PERR(" Decoded content size: %zu\n",
+ PERR("\tDecoded content size: %zu\n",
notit->cur_content_size);
- PERR(" Cursor position: %zu\n", packet_at(notit));
+ PERR("\tCursor position: %zu\n", packet_at(notit));
status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
goto end;
}
STATE_AFTER_STREAM_EVENT_HEADER,
STATE_DSCOPE_STREAM_EVENT_HEADER_CONTINUE,
¬it->dscopes.stream_event_header);
-
end:
BT_PUT(event_header_type);
return status;
}
+
+/**
+ * Resets the internal state of a CTF notification iterator.
+ *
+ * This function can be used when it is desired to seek to the beginning
+ * of another packet. It is expected that the next call to
+ * bt_ctf_notif_iter_medium_ops::request_bytes() made by this
+ * notification iterator will return the \em first bytes of a \em
+ * packet.
+ *
+ * @param notif_iter CTF notification iterator
+ */
+static
void bt_ctf_notif_iter_reset(struct bt_ctf_notif_iter *notit)
{
assert(notit);
notit->cur_packet_size = -1;
}
+static
+int bt_ctf_notif_iter_switch_packet(struct bt_ctf_notif_iter *notit)
+{
+ int ret = 0;
+
+ assert(notit);
+ stack_clear(notit->stack);
+ BT_PUT(notit->meta.stream_class);
+ BT_PUT(notit->meta.event_class);
+ BT_PUT(notit->packet);
+ put_all_dscopes(notit);
+
+ /*
+ * Adjust current buffer so that addr points to the beginning of the new
+ * packet.
+ */
+ if (notit->buf.addr) {
+ size_t consumed_bytes = (size_t) (notit->buf.at / CHAR_BIT);
+
+ /* Packets are assumed to start on a byte frontier. */
+ if (notit->buf.at % CHAR_BIT) {
+ ret = -1;
+ goto end;
+ }
+
+ notit->buf.addr += consumed_bytes;
+ notit->buf.sz -= consumed_bytes;
+ notit->buf.at = 0;
+ notit->buf.packet_offset = 0;
+ }
+
+ notit->cur_content_size = -1;
+ notit->cur_packet_size = -1;
+end:
+ return ret;
+}
+
static
struct bt_ctf_field *get_next_field(struct bt_ctf_notif_iter *notit)
{
break;
}
+ if (!next_field) {
+ next_field = NULL;
+ }
+
end:
BT_PUT(base_type);
int iret;
struct bt_ctf_field_path *field_path;
struct bt_ctf_notif_iter *notit = data;
- struct bt_ctf_field *field = NULL;
+ struct bt_ctf_field *length_field = NULL;
uint64_t length;
field_path = bt_ctf_field_type_sequence_get_length_field_path(type);
goto end;
}
- field = resolve_field(notit, field_path);
- if (!field) {
+ length_field = resolve_field(notit, field_path);
+ if (!length_field) {
goto end;
}
- iret = bt_ctf_field_unsigned_integer_get_value(field, &length);
+ iret = bt_ctf_field_unsigned_integer_get_value(length_field, &length);
if (iret) {
goto end;
}
+ iret = bt_ctf_field_sequence_set_length(stack_top(notit->stack)->base,
+ length_field);
+ if (iret) {
+ goto end;
+ }
ret = (int64_t) length;
end:
- BT_PUT(field);
+ BT_PUT(length_field);
BT_PUT(field_path);
return ret;
return selected_field_type;
}
-static struct bt_ctf_event *create_event(struct bt_ctf_notif_iter *notit)
+static
+struct bt_ctf_event *create_event(struct bt_ctf_notif_iter *notit)
{
- struct bt_ctf_event *event;
int ret;
+ struct bt_ctf_event *event;
- /* Create event object */
+ /* Create event object. */
event = bt_ctf_event_create(notit->meta.event_class);
if (!event) {
goto error;
}
- /* Set header, stream event context, context, and payload fields */
+ /* Set header, stream event context, context, and payload fields. */
ret = bt_ctf_event_set_header(event,
notit->dscopes.stream_event_header);
if (ret) {
goto error;
}
- /* Associate with current packet */
+ /* Associate with current packet. */
assert(notit->packet);
ret = bt_ctf_event_set_packet(event, notit->packet);
if (ret) {
}
goto end;
-
error:
BT_PUT(event);
-
end:
return event;
}
-static void create_packet(struct bt_ctf_notif_iter *notit)
+static
+void create_packet(struct bt_ctf_notif_iter *notit)
{
int ret;
struct bt_ctf_stream *stream = NULL;
/* Ask the user for the stream */
stream = notit->medium.medops.get_stream(notit->meta.stream_class,
- notit->medium.data);
+ notit->medium.data);
if (!stream) {
goto error;
}
/* Set packet's context and header fields */
if (notit->dscopes.trace_packet_header) {
ret = bt_ctf_packet_set_header(packet,
- notit->dscopes.trace_packet_header);
+ notit->dscopes.trace_packet_header);
if (ret) {
goto error;
}
if (notit->dscopes.stream_packet_context) {
ret = bt_ctf_packet_set_context(packet,
- notit->dscopes.stream_packet_context);
+ notit->dscopes.stream_packet_context);
if (ret) {
goto error;
}
}
goto end;
-
error:
BT_PUT(packet);
-
end:
BT_MOVE(notit->packet, packet);
}
-static void notify_new_packet(struct bt_ctf_notif_iter *notit,
- struct bt_ctf_notif_iter_notif **notification)
+static
+void notify_new_packet(struct bt_ctf_notif_iter *notit,
+ struct bt_notification **notification)
{
- struct bt_ctf_notif_iter_notif_new_packet *rnotif;
-
- rnotif = g_new0(struct bt_ctf_notif_iter_notif_new_packet, 1);
- if (!rnotif) {
- goto error;
- }
-
- rnotif->base.type = BT_CTF_NOTIF_ITER_NOTIF_NEW_PACKET;
+ struct bt_notification *ret;
- /* Create packet */
+ /* Initialize the iterator's current packet */
create_packet(notit);
if (!notit->packet) {
- goto error;
+ return;
}
- rnotif->packet = bt_get(notit->packet);
- *notification = (struct bt_ctf_notif_iter_notif *) rnotif;
- return;
-
-error:
- bt_ctf_notif_iter_notif_destroy(rnotif);
+ ret = bt_notification_packet_start_create(notit->packet);
+ if (!ret) {
+ return;
+ }
+ *notification = ret;
}
-static void notify_end_of_packet(struct bt_ctf_notif_iter *notit,
- struct bt_ctf_notif_iter_notif **notification)
+static
+void notify_end_of_packet(struct bt_ctf_notif_iter *notit,
+ struct bt_notification **notification)
{
- struct bt_ctf_notif_iter_notif_end_of_packet *rnotif;
+ struct bt_notification *ret;
- rnotif = g_new0(struct bt_ctf_notif_iter_notif_end_of_packet, 1);
- if (!rnotif) {
- goto error;
- }
-
- rnotif->base.type = BT_CTF_NOTIF_ITER_NOTIF_END_OF_PACKET;
-
- /* Create packet */
- create_packet(notit);
if (!notit->packet) {
- goto error;
+ return;
}
- rnotif->packet = bt_get(notit->packet);
- *notification = (struct bt_ctf_notif_iter_notif *) rnotif;
- return;
-
-error:
- bt_ctf_notif_iter_notif_destroy(rnotif);
+ ret = bt_notification_packet_end_create(notit->packet);
+ if (!ret) {
+ return;
+ }
+ BT_PUT(notit->packet);
+ *notification = ret;
}
-static void notify_event(struct bt_ctf_notif_iter *notit,
- struct bt_ctf_notif_iter_notif **notification)
+static
+void notify_event(struct bt_ctf_notif_iter *notit,
+ struct bt_notification **notification)
{
- struct bt_ctf_notif_iter_notif_event *rnotif;
- struct bt_ctf_event *event = NULL;
-
- rnotif = g_new0(struct bt_ctf_notif_iter_notif_event, 1);
- if (!rnotif) {
- goto error;
- }
-
- rnotif->base.type = BT_CTF_NOTIF_ITER_NOTIF_EVENT;
+ struct bt_ctf_event *event;
+ struct bt_notification *ret = NULL;
/* Create event */
event = create_event(notit);
if (!event) {
- goto error;
+ goto end;
}
- BT_MOVE(rnotif->event, event);
- *notification = (struct bt_ctf_notif_iter_notif *) rnotif;
- return;
-
-error:
+ ret = bt_notification_event_create(event);
+ if (!ret) {
+ goto end;
+ }
+ *notification = ret;
+end:
BT_PUT(event);
- bt_ctf_notif_iter_notif_destroy(rnotif);
}
-void bt_ctf_notif_iter_notif_destroy(void *vnotif)
+static
+void notify_eos(struct bt_ctf_notif_iter *notit,
+ struct bt_notification **notification)
{
- struct bt_ctf_notif_iter_notif *notif = vnotif;
-
- switch (notif->type) {
- case BT_CTF_NOTIF_ITER_NOTIF_NEW_PACKET:
- {
- struct bt_ctf_notif_iter_notif_new_packet *rnotif =
- (struct bt_ctf_notif_iter_notif_new_packet *) notif;
-
- BT_PUT(rnotif->packet);
- break;
- }
- case BT_CTF_NOTIF_ITER_NOTIF_END_OF_PACKET:
- {
- struct bt_ctf_notif_iter_notif_end_of_packet *rnotif =
- (struct bt_ctf_notif_iter_notif_end_of_packet *) notif;
+ struct bt_ctf_event *event;
+ struct bt_notification *ret = NULL;
- BT_PUT(rnotif->packet);
- break;
+ /* Create event */
+ event = create_event(notit);
+ if (!event) {
+ goto end;
}
- case BT_CTF_NOTIF_ITER_NOTIF_EVENT:
- {
- struct bt_ctf_notif_iter_notif_event *rnotif =
- (struct bt_ctf_notif_iter_notif_event *) notif;
- BT_PUT(rnotif->event);
- break;
- }
- default:
- assert(false);
+ ret = bt_notification_stream_end_create(event);
+ if (!ret) {
+ goto end;
}
-
- g_free(notif);
+ *notification = ret;
+end:
+ BT_PUT(event);
}
struct bt_ctf_notif_iter *bt_ctf_notif_iter_create(struct bt_ctf_trace *trace,
enum bt_ctf_notif_iter_status bt_ctf_notif_iter_get_next_notification(
struct bt_ctf_notif_iter *notit,
- struct bt_ctf_notif_iter_notif **notification)
+ struct bt_notification **notification)
{
enum bt_ctf_notif_iter_status status = BT_CTF_NOTIF_ITER_STATUS_OK;
status = handle_state(notit);
if (status != BT_CTF_NOTIF_ITER_STATUS_OK) {
if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
- PDBG("Medium operation reported end of file\n");
+ PDBG("Medium operation reported end of stream\n");
} else {
PERR("Failed to handle state:\n");
- PERR(" State: %d\n", notit->state);
+ PERR("\tState: %d\n", notit->state);
}
goto end;
}