STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
STATE_AFTER_STREAM_PACKET_CONTEXT,
- STATE_EMIT_MSG_NEW_STREAM,
- STATE_EMIT_MSG_NEW_PACKET,
+ STATE_EMIT_MSG_STREAM_BEGINNING,
+ STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING,
+ STATE_EMIT_MSG_PACKET_BEGINNING,
STATE_DSCOPE_EVENT_HEADER_BEGIN,
STATE_DSCOPE_EVENT_HEADER_CONTINUE,
STATE_AFTER_EVENT_HEADER,
STATE_DSCOPE_EVENT_PAYLOAD_BEGIN,
STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE,
STATE_EMIT_MSG_EVENT,
- STATE_EMIT_MSG_END_OF_PACKET,
- STATE_DONE,
STATE_SKIP_PACKET_PADDING,
+ STATE_EMIT_MSG_PACKET_END_MULTI,
+ STATE_EMIT_MSG_PACKET_END_SINGLE,
+ STATE_EMIT_MSG_STREAM_ACTIVITY_END,
+ STATE_EMIT_MSG_STREAM_END,
+ STATE_DONE,
};
/* CTF message iterator */
/* Current message iterator to create messages (weak) */
bt_self_message_iterator *msg_iter;
+ /*
+ * True to emit stream beginning and stream activity beginning
+ * messages.
+ */
+ bool emit_stream_begin_msg;
+
+ /*
+ * True to emit stream end and stream activity end messages.
+ */
+ bool emit_stream_end_msg;
+
/*
* Current dynamic scope field pointer.
*
void *data;
} medium;
- /* Stream beginning was emitted */
- bool stream_begin_emitted;
-
/* Current packet size (bits) (-1 if unknown) */
int64_t cur_exp_packet_total_size;
return "STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE";
case STATE_AFTER_STREAM_PACKET_CONTEXT:
return "STATE_AFTER_STREAM_PACKET_CONTEXT";
- case STATE_EMIT_MSG_NEW_PACKET:
- return "STATE_EMIT_MSG_NEW_PACKET";
- case STATE_EMIT_MSG_NEW_STREAM:
- return "STATE_EMIT_MSG_NEW_STREAM";
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ return "STATE_EMIT_MSG_STREAM_BEGINNING";
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING";
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
+ return "STATE_EMIT_MSG_PACKET_BEGINNING";
case STATE_DSCOPE_EVENT_HEADER_BEGIN:
return "STATE_DSCOPE_EVENT_HEADER_BEGIN";
case STATE_DSCOPE_EVENT_HEADER_CONTINUE:
return "STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE";
case STATE_EMIT_MSG_EVENT:
return "STATE_EMIT_MSG_EVENT";
- case STATE_EMIT_MSG_END_OF_PACKET:
- return "STATE_EMIT_MSG_END_OF_PACKET";
- case STATE_DONE:
- return "STATE_DONE";
case STATE_SKIP_PACKET_PADDING:
return "STATE_SKIP_PACKET_PADDING";
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
+ return "STATE_EMIT_MSG_PACKET_END_MULTI";
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ return "STATE_EMIT_MSG_PACKET_END_SINGLE";
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ return "STATE_EMIT_MSG_STREAM_ACTIVITY_END";
+ case STATE_EMIT_MSG_STREAM_END:
+ return "STATE_EMIT_MSG_STREAM_END";
+ case STATE_DONE:
+ return "STATE_DONE";
default:
return "(unknown)";
}
goto end;
}
+ /*
+ * Make sure at least one bit is available for this packet. An
+ * empty packet is impossible. If we reach the end of the medium
+ * at this point, then it's considered the end of the stream.
+ */
+ ret = buf_ensure_available_bits(notit);
+ switch (ret) {
+ case BT_MSG_ITER_STATUS_OK:
+ break;
+ case BT_MSG_ITER_STATUS_EOF:
+ ret = BT_MSG_ITER_STATUS_OK;
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+ goto end;
+ default:
+ goto end;
+ }
+
/* Packet header class is common to the whole trace class. */
packet_header_fc = notit->meta.tc->packet_header_fc;
if (!packet_header_fc) {
"notit-addr=%p, packet-size=%" PRIu64 ", content-size=%" PRIu64,
notit, notit->cur_exp_packet_total_size,
notit->cur_exp_packet_content_size);
+
end:
return status;
}
static
-enum bt_msg_iter_status after_packet_context_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status;
goto end;
}
- if (notit->stream_begin_emitted) {
- notit->state = STATE_EMIT_MSG_NEW_PACKET;
+ if (notit->stream) {
+ /*
+ * Stream exists, which means we already emitted at
+ * least one packet beginning message, so the initial
+ * stream beginning message was also emitted.
+ */
+ notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
} else {
- notit->state = STATE_EMIT_MSG_NEW_STREAM;
+ notit->state = STATE_EMIT_MSG_STREAM_BEGINNING;
}
end:
}
static
-enum bt_msg_iter_status read_event_header_begin_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status read_event_header_begin_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
struct ctf_field_class *event_header_fc = NULL;
/* No more events! */
BT_LOGV("Reached end of packet: notit-addr=%p, "
"cur=%zu", notit, packet_at(notit));
- notit->state = STATE_EMIT_MSG_END_OF_PACKET;
+ notit->state = STATE_EMIT_MSG_PACKET_END_MULTI;
goto end;
} else if (unlikely(packet_at(notit) >
notit->cur_exp_packet_content_size)) {
* nothing else for us.
*/
status = buf_ensure_available_bits(notit);
- if (status != BT_MSG_ITER_STATUS_OK) {
- /*
- * If this function returns
- * `BT_MSG_ITER_STATUS_EOF`:
- *
- * 1. bt_msg_iter_get_next_message()
- * emits a "packet end" message. This
- * resets the current packet. The state
- * remains unchanged otherwise.
- * 2. This function is called again. It returns
- * `BT_MSG_ITER_STATUS_EOF` again.
- * 3. bt_msg_iter_get_next_message()
- * emits a "stream end" message because
- * there's no current packet. It sets the
- * current state to `STATE_DONE`.
- */
+ switch (status) {
+ case BT_MSG_ITER_STATUS_OK:
+ break;
+ case BT_MSG_ITER_STATUS_EOF:
+ status = BT_MSG_ITER_STATUS_OK;
+ notit->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
+ goto end;
+ default:
goto end;
}
}
}
static
-enum bt_msg_iter_status skip_packet_padding_state(
- struct bt_msg_iter *notit)
+enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
size_t bits_to_skip;
case STATE_AFTER_STREAM_PACKET_CONTEXT:
status = after_packet_context_state(notit);
break;
- case STATE_EMIT_MSG_NEW_STREAM:
- notit->state = STATE_EMIT_MSG_NEW_PACKET;
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING;
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
break;
- case STATE_EMIT_MSG_NEW_PACKET:
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
notit->state = STATE_DSCOPE_EVENT_HEADER_BEGIN;
break;
case STATE_DSCOPE_EVENT_HEADER_BEGIN:
case STATE_SKIP_PACKET_PADDING:
status = skip_packet_padding_state(notit);
break;
- case STATE_EMIT_MSG_END_OF_PACKET:
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
notit->state = STATE_SKIP_PACKET_PADDING;
break;
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ notit->state = STATE_EMIT_MSG_STREAM_END;
+ break;
+ case STATE_EMIT_MSG_STREAM_END:
+ notit->state = STATE_DONE;
+ break;
+ case STATE_DONE:
+ break;
default:
BT_LOGD("Unknown CTF plugin message iterator state: "
"notit-addr=%p, state=%d", notit, notit->state);
notit->cur_stream_class_id = -1;
notit->cur_event_class_id = -1;
notit->cur_data_stream_id = -1;
- notit->stream_begin_emitted = false;
+ notit->emit_stream_begin_msg = true;
+ notit->emit_stream_end_msg = true;
}
static
static
void set_event_default_clock_snapshot(struct bt_msg_iter *notit)
{
- bt_event *event =
- bt_message_event_borrow_event(
- notit->event_msg);
+ bt_event *event = bt_message_event_borrow_event(notit->event_msg);
bt_stream_class *sc = notit->meta.sc->ir_sc;
BT_ASSERT(event);
}
static
-void notify_new_stream(struct bt_msg_iter *notit,
+void create_msg_stream_beginning(struct bt_msg_iter *notit,
bt_message **message)
{
- enum bt_msg_iter_status status;
bt_message *ret = NULL;
- status = set_current_stream(notit);
- if (status != BT_MSG_ITER_STATUS_OK) {
- BT_MESSAGE_PUT_REF_AND_RESET(ret);
- goto end;
- }
-
BT_ASSERT(notit->stream);
BT_ASSERT(notit->msg_iter);
ret = bt_message_stream_beginning_create(notit->msg_iter,
return;
}
-end:
*message = ret;
}
static
-void notify_end_of_stream(struct bt_msg_iter *notit,
+void create_msg_stream_activity_beginning(struct bt_msg_iter *notit,
+ bt_message **message)
+{
+ bt_message *ret = NULL;
+
+ BT_ASSERT(notit->stream);
+ BT_ASSERT(notit->msg_iter);
+ ret = bt_message_stream_activity_beginning_create(notit->msg_iter,
+ notit->stream);
+ if (!ret) {
+ BT_LOGE("Cannot create stream activity beginning message: "
+ "notit-addr=%p, stream-addr=%p",
+ notit, notit->stream);
+ return;
+ }
+
+ *message = ret;
+}
+
+static
+void create_msg_stream_activity_end(struct bt_msg_iter *notit,
bt_message **message)
+{
+ bt_message *ret = NULL;
+
+ if (!notit->stream) {
+ BT_LOGE("Cannot create stream for stream message: "
+ "notit-addr=%p", notit);
+ return;
+ }
+
+ BT_ASSERT(notit->stream);
+ BT_ASSERT(notit->msg_iter);
+ ret = bt_message_stream_activity_end_create(notit->msg_iter,
+ notit->stream);
+ if (!ret) {
+ BT_LOGE("Cannot create stream activity end message: "
+ "notit-addr=%p, stream-addr=%p",
+ notit, notit->stream);
+ return;
+ }
+
+ *message = ret;
+}
+
+static
+void create_msg_stream_end(struct bt_msg_iter *notit, bt_message **message)
{
bt_message *ret;
ret = bt_message_stream_end_create(notit->msg_iter,
notit->stream);
if (!ret) {
- BT_LOGE("Cannot create stream beginning message: "
+ BT_LOGE("Cannot create stream end message: "
"notit-addr=%p, stream-addr=%p",
notit, notit->stream);
return;
}
+
*message = ret;
}
static
-void notify_new_packet(struct bt_msg_iter *notit,
+void create_msg_packet_beginning(struct bt_msg_iter *notit,
bt_message **message)
{
int ret;
}
static
-void notify_end_of_packet(struct bt_msg_iter *notit,
- bt_message **message)
+void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message)
{
bt_message *msg;
enum bt_msg_iter_status bt_msg_iter_get_next_message(
struct bt_msg_iter *notit,
- bt_self_message_iterator *msg_iter,
- bt_message **message)
+ bt_self_message_iterator *msg_iter, bt_message **message)
{
enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
BT_ASSERT(notit);
BT_ASSERT(message);
-
- if (notit->state == STATE_DONE) {
- status = BT_MSG_ITER_STATUS_EOF;
- goto end;
- }
-
notit->msg_iter = msg_iter;
-
BT_LOGV("Getting next message: notit-addr=%p", notit);
while (true) {
status = handle_state(notit);
- if (status == BT_MSG_ITER_STATUS_AGAIN) {
+ if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
goto end;
- }
-
- if (status != BT_MSG_ITER_STATUS_OK) {
- if (status == BT_MSG_ITER_STATUS_EOF) {
- enum state next_state = notit->state;
-
- BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
-
- if (notit->packet) {
- notify_end_of_packet(notit,
- message);
- } else {
- notify_end_of_stream(notit,
- message);
- next_state = STATE_DONE;
- }
-
- if (!*message) {
- status = BT_MSG_ITER_STATUS_ERROR;
- goto end;
- }
-
- status = BT_MSG_ITER_STATUS_OK;
- notit->state = next_state;
- } else {
- BT_LOGW("Cannot handle state: "
- "notit-addr=%p, state=%s",
- notit, state_string(notit->state));
- }
-
+ } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+ BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+ notit, state_string(notit->state));
goto end;
}
switch (notit->state) {
- case STATE_EMIT_MSG_NEW_STREAM:
- /* notify_new_stream() logs errors */
- notify_new_stream(notit, message);
+ case STATE_EMIT_MSG_EVENT:
+ BT_ASSERT(notit->event_msg);
+ set_event_default_clock_snapshot(notit);
+ *message = notit->event_msg;
+ notit->event_msg = NULL;
+ goto end;
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
+ /* create_msg_packet_beginning() logs errors */
+ create_msg_packet_beginning(notit, message);
if (!*message) {
status = BT_MSG_ITER_STATUS_ERROR;
}
- notit->stream_begin_emitted = true;
goto end;
- case STATE_EMIT_MSG_NEW_PACKET:
- /* notify_new_packet() logs errors */
- notify_new_packet(notit, message);
+ case STATE_EMIT_MSG_PACKET_END_SINGLE:
+ case STATE_EMIT_MSG_PACKET_END_MULTI:
+ /* create_msg_packet_end() logs errors */
+ create_msg_packet_end(notit, message);
if (!*message) {
status = BT_MSG_ITER_STATUS_ERROR;
}
goto end;
- case STATE_EMIT_MSG_EVENT:
- BT_ASSERT(notit->event_msg);
- set_event_default_clock_snapshot(notit);
- *message = notit->event_msg;
- notit->event_msg = NULL;
- goto end;
- case STATE_EMIT_MSG_END_OF_PACKET:
- /* notify_end_of_packet() logs errors */
- notify_end_of_packet(notit, message);
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+ if (notit->emit_stream_begin_msg) {
+ /* create_msg_stream_activity_beginning() logs errors */
+ create_msg_stream_activity_beginning(notit, message);
- if (!*message) {
- status = BT_MSG_ITER_STATUS_ERROR;
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
}
+ break;
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+ if (notit->emit_stream_end_msg) {
+ /* create_msg_stream_activity_end() logs errors */
+ create_msg_stream_activity_end(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ status = set_current_stream(notit);
+ if (status != BT_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+
+ if (notit->emit_stream_begin_msg) {
+ /* create_msg_stream_beginning() logs errors */
+ create_msg_stream_beginning(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_EMIT_MSG_STREAM_END:
+ if (notit->emit_stream_end_msg) {
+ /* create_msg_stream_end() logs errors */
+ create_msg_stream_end(notit, message);
+
+ if (!*message) {
+ status = BT_MSG_ITER_STATUS_ERROR;
+ }
+
+ goto end;
+ }
+
+ break;
+ case STATE_DONE:
+ status = BT_MSG_ITER_STATUS_EOF;
goto end;
default:
/* Non-emitting state: continue */
BT_ASSERT(notit);
- if (notit->state == STATE_EMIT_MSG_NEW_PACKET) {
+ if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) {
/* We're already there */
goto end;
}
while (true) {
status = handle_state(notit);
- if (status == BT_MSG_ITER_STATUS_AGAIN) {
+ if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
goto end;
- }
- if (status != BT_MSG_ITER_STATUS_OK) {
- if (status == BT_MSG_ITER_STATUS_EOF) {
- BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
- } else {
- BT_LOGW("Cannot handle state: "
- "notit-addr=%p, state=%s",
- notit, state_string(notit->state));
- }
+ } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+ BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+ notit, state_string(notit->state));
goto end;
}
switch (notit->state) {
- case STATE_EMIT_MSG_NEW_PACKET:
+ case STATE_EMIT_MSG_PACKET_BEGINNING:
/*
* Packet header and context fields are
* potentially decoded (or they don't exist).
*/
goto end;
case STATE_INIT:
- case STATE_EMIT_MSG_NEW_STREAM:
case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE:
case STATE_AFTER_TRACE_PACKET_HEADER:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
case STATE_AFTER_STREAM_PACKET_CONTEXT:
+ case STATE_EMIT_MSG_STREAM_BEGINNING:
+ case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
/* Non-emitting state: continue */
break;
default:
/*
* We should never get past the
- * STATE_EMIT_MSG_NEW_PACKET state.
+ * STATE_EMIT_MSG_PACKET_BEGINNING state.
*/
BT_LOGF("Unexpected state: notit-addr=%p, state=%s",
notit, state_string(notit->state));
}
BT_HIDDEN
-enum bt_msg_iter_status bt_msg_iter_seek(
- struct bt_msg_iter *notit, off_t offset)
+enum bt_msg_iter_status bt_msg_iter_seek(struct bt_msg_iter *notit,
+ off_t offset)
{
enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK;
enum bt_msg_iter_medium_status medium_status;
return ret;
}
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_offset(struct bt_msg_iter *notit)
-{
- BT_ASSERT(notit);
- return notit->cur_packet_offset;
-}
-
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_size(
- struct bt_msg_iter *notit)
-{
- BT_ASSERT(notit);
- return notit->cur_exp_packet_total_size;
-}
-
-BT_HIDDEN
-void bt_msg_trace_class_changed(struct bt_msg_iter *notit)
-{
- if (notit->meta.tc->stored_value_count > notit->stored_values->len) {
- g_array_set_size(notit->stored_values,
- notit->meta.tc->stored_value_count);
- }
-}
-
BT_HIDDEN
enum bt_msg_iter_status bt_msg_iter_get_packet_properties(
struct bt_msg_iter *notit,
goto end;
}
- props->exp_packet_total_size =
- (uint64_t) notit->cur_exp_packet_total_size;
- props->exp_packet_content_size =
- (uint64_t) notit->cur_exp_packet_content_size;
+ props->exp_packet_total_size = notit->cur_exp_packet_total_size;
+ props->exp_packet_content_size = notit->cur_exp_packet_content_size;
BT_ASSERT(props->stream_class_id >= 0);
props->stream_class_id = (uint64_t) notit->cur_stream_class_id;
props->data_stream_id = notit->cur_data_stream_id;
end:
return status;
}
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit,
+ bool val)
+{
+ notit->emit_stream_begin_msg = val;
+}
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit,
+ bool val)
+{
+ notit->emit_stream_end_msg = val;
+}