/*
- * Babeltrace - CTF message iterator
+ * SPDX-License-Identifier: MIT
*
* Copyright (c) 2015-2018 EfficiOS Inc. and Linux Foundation
* Copyright (c) 2015-2018 Philippe Proulx <pproulx@efficios.com>
*
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
+ * Babeltrace - CTF message iterator
*/
#define BT_COMP_LOG_SELF_COMP (msg_it->self_comp)
STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
STATE_AFTER_STREAM_PACKET_CONTEXT,
- STATE_CHECK_EMIT_MSG_STREAM_BEGINNING,
STATE_EMIT_MSG_STREAM_BEGINNING,
STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS,
STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS,
STATE_SKIP_PACKET_PADDING,
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
- STATE_CHECK_EMIT_MSG_STREAM_END,
STATE_EMIT_QUEUED_MSG_PACKET_END,
+ STATE_CHECK_EMIT_MSG_STREAM_END,
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
};
/* Current message iterator to create messages (weak) */
bt_self_message_iterator *self_msg_iter;
- /* True to emit a stream beginning message. */
- bool emit_stream_begin_msg;
-
- /* True to emit a stream end message. */
- bool emit_stream_end_msg;
-
/*
* True if library objects are unavailable during the decoding and
* should not be created/used.
*/
bool dry_run;
- /* True to set the stream */
- bool set_stream;
-
/*
* Current dynamic scope field pointer.
*
struct ctf_event_class *ec;
} meta;
- /* Current packet context field wrapper (NULL if not created yet) */
- bt_packet_context_field *packet_context_field;
-
/* Current packet (NULL if not created yet) */
bt_packet *packet;
*/
bool emit_delayed_packet_beginning_msg;
+ /*
+ * True if this is the first packet we are reading, and therefore if we
+ * should emit a stream beginning message.
+ */
+ bool emit_stream_beginning_message;
+
+ /*
+ * True if we need to emit a stream end message at the end of the
+ * current stream. A live stream may never receive any data and thus
+ * never send a stream beginning message which removes the need to emit
+ * a stream end message.
+ */
+ bool emit_stream_end_message;
+
/* Database of current dynamic scopes */
struct {
bt_field *stream_packet_context;
return "AFTER_STREAM_PACKET_CONTEXT";
case STATE_EMIT_MSG_STREAM_BEGINNING:
return "EMIT_MSG_STREAM_BEGINNING";
+ case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS:
+ return "CHECK_EMIT_MSG_DISCARDED_EVENTS";
+ case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS:
+ return "CHECK_EMIT_MSG_DISCARDED_PACKETS";
case STATE_EMIT_MSG_PACKET_BEGINNING:
return "EMIT_MSG_PACKET_BEGINNING";
case STATE_EMIT_MSG_DISCARDED_EVENTS:
return "EMIT_MSG_PACKET_END_SINGLE";
case STATE_EMIT_QUEUED_MSG_PACKET_END:
return "EMIT_QUEUED_MSG_PACKET_END";
+ case STATE_CHECK_EMIT_MSG_STREAM_END:
+ return "CHECK_EMIT_MSG_STREAM_END";
case STATE_EMIT_MSG_STREAM_END:
return "EMIT_MSG_STREAM_END";
case STATE_DONE:
return "DONE";
- default:
- return "(unknown)";
}
+
+ bt_common_abort();
}
static
"packet-offset=%zu, cur=%zu, size=%zu, addr=%p",
msg_it->buf.packet_offset, msg_it->buf.at,
msg_it->buf.sz, msg_it->buf.addr);
- BT_COMP_LOGD_MEM(buffer_addr, buffer_sz, "Returned bytes at %p:",
+ BT_COMP_LOGT_MEM(buffer_addr, buffer_sz, "Returned bytes at %p:",
buffer_addr);
} else if (m_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/*
{
msg_it->dscopes.stream_packet_context = NULL;
- if (msg_it->packet_context_field) {
- bt_packet_context_field_release(msg_it->packet_context_field);
- msg_it->packet_context_field = NULL;
- }
-
release_event_dscopes(msg_it);
}
static
enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it)
{
- enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK;
+ enum ctf_msg_iter_status status;
bt_self_component *self_comp = msg_it->self_comp;
/*
release_all_dscopes(msg_it);
msg_it->cur_dscope_field = NULL;
+ if (msg_it->medium.medops.switch_packet) {
+ enum ctf_msg_iter_medium_status medium_status;
+
+ medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
+ if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
+ /* No more packets. */
+ msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
+ status = CTF_MSG_ITER_STATUS_OK;
+ goto end;
+ } else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
+ status = (int) medium_status;
+ goto end;
+ }
+
+ /*
+ * After the packet switch, the medium might want to give us a
+ * different buffer for the new packet.
+ */
+ status = request_medium_bytes(msg_it);
+ if (status != CTF_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+ }
+
/*
* Adjust current buffer so that addr points to the beginning of the new
* packet.
msg_it->snapshots.end_clock = UINT64_C(-1);
msg_it->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN;
+ status = CTF_MSG_ITER_STATUS_OK;
end:
return status;
}
msg_it->cur_stream_class_id = -1;
msg_it->cur_event_class_id = -1;
msg_it->cur_data_stream_id = -1;
- BT_COMP_LOGD("Decoding packet header field:"
+ BT_COMP_LOGD("Decoding packet header field: "
"msg-it-addr=%p, trace-class-addr=%p, fc-addr=%p",
msg_it, msg_it->meta.tc, packet_header_fc);
status = read_dscope_begin_state(msg_it, packet_header_fc,
goto end;
}
+ if (!msg_it->dry_run) {
+ status = set_current_stream(msg_it);
+ if (status != CTF_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+
+ status = set_current_packet(msg_it);
+ if (status != CTF_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+ }
+
msg_it->state = STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN;
+ status = CTF_MSG_ITER_STATUS_OK;
+
end:
return status;
}
goto end;
}
- BT_ASSERT(!msg_it->packet_context_field);
-
if (packet_context_fc->in_ir && !msg_it->dry_run) {
- /*
- * Create free packet context field from stream class.
- * This field is going to be moved to the packet once we
- * create it. We cannot create the packet now because a
- * packet is created from a stream, and this API must be
- * able to return the packet context properties without
- * creating a stream
- * (ctf_msg_iter_get_packet_properties()).
- */
- msg_it->packet_context_field =
- bt_packet_context_field_create(
- msg_it->meta.sc->ir_sc);
- if (!msg_it->packet_context_field) {
- BT_COMP_LOGE_APPEND_CAUSE(self_comp,
- "Cannot create packet context field wrapper from stream class.");
- status = CTF_MSG_ITER_STATUS_ERROR;
- goto end;
- }
-
+ BT_ASSERT(!msg_it->dscopes.stream_packet_context);
+ BT_ASSERT(msg_it->packet);
msg_it->dscopes.stream_packet_context =
- bt_packet_context_field_borrow_field(
- msg_it->packet_context_field);
+ bt_packet_borrow_context_field(msg_it->packet);
BT_ASSERT(msg_it->dscopes.stream_packet_context);
}
goto end;
}
- if (msg_it->stream) {
- /*
- * Stream exists, which means we already emitted at
- * least one packet beginning message, so the initial
- * stream beginning message was also emitted.
- */
- msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
+ if (msg_it->emit_stream_beginning_message) {
+ msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING;
} else {
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING;
+ msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
}
end:
return status;
}
-static
-enum ctf_msg_iter_status check_emit_msg_stream_beginning_state(
- struct ctf_msg_iter *msg_it)
-{
- enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK;
-
- if (msg_it->set_stream) {
- status = set_current_stream(msg_it);
- if (status != CTF_MSG_ITER_STATUS_OK) {
- goto end;
- }
- }
-
- if (msg_it->emit_stream_begin_msg) {
- msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING;
- } else {
- /* Stream's first packet */
- msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
- }
-
-end:
- return status;
-}
-
static
enum ctf_msg_iter_status check_emit_msg_discarded_events(
struct ctf_msg_iter *msg_it)
return CTF_MSG_ITER_STATUS_OK;
}
-static
-enum ctf_msg_iter_status check_emit_msg_stream_end(
- struct ctf_msg_iter *msg_it)
+static inline
+enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it)
{
- if (msg_it->emit_stream_end_msg) {
- msg_it->state = STATE_EMIT_MSG_STREAM_END;
+ enum state next_state;
+
+ if (msg_it->emit_stream_end_message) {
+ next_state = STATE_EMIT_MSG_STREAM_END;
} else {
- msg_it->state = STATE_DONE;
+ next_state = STATE_DONE;
}
- return CTF_MSG_ITER_STATUS_OK;
+ return next_state;
}
static inline
case STATE_AFTER_STREAM_PACKET_CONTEXT:
status = after_packet_context_state(msg_it);
break;
- case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING:
- status = check_emit_msg_stream_beginning_state(msg_it);
- break;
case STATE_EMIT_MSG_STREAM_BEGINNING:
msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
break;
msg_it->state = STATE_SKIP_PACKET_PADDING;
break;
case STATE_EMIT_MSG_PACKET_END_SINGLE:
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
- break;
- case STATE_CHECK_EMIT_MSG_STREAM_END:
- status = check_emit_msg_stream_end(msg_it);
+ msg_it->state = STATE_EMIT_MSG_STREAM_END;
break;
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
break;
+ case STATE_CHECK_EMIT_MSG_STREAM_END:
+ msg_it->state = check_emit_msg_stream_end(msg_it);
+ break;
case STATE_EMIT_MSG_STREAM_END:
msg_it->state = STATE_DONE;
break;
release_all_dscopes(msg_it);
msg_it->cur_dscope_field = NULL;
- if (msg_it->packet_context_field) {
- bt_packet_context_field_release(msg_it->packet_context_field);
- msg_it->packet_context_field = NULL;
- }
-
msg_it->buf.addr = NULL;
msg_it->buf.sz = 0;
msg_it->buf.at = 0;
msg_it->prev_packet_snapshots.packets = UINT64_C(-1);
msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1);
msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1);
+ msg_it->emit_stream_beginning_message = true;
+ msg_it->emit_stream_end_message = false;
}
static
if (selected_option->fc->in_ir && !msg_it->dry_run) {
bt_field *var_field = stack_top(msg_it->stack)->base;
- ret = bt_field_variant_select_option_field_by_index(
+ ret = bt_field_variant_select_option_by_index(
var_field, option_index);
if (ret) {
BT_COMP_LOGE_APPEND_CAUSE(self_comp,
bool use_default_cs)
{
bt_self_component *self_comp = msg_it->self_comp;
- int ret;
bt_message *msg;
const bt_stream_class *sc = msg_it->meta.sc->ir_sc;
BT_ASSERT(msg_it->packet);
BT_ASSERT(sc);
-
- if (msg_it->packet_context_field) {
- ret = bt_packet_move_context_field(
- msg_it->packet, msg_it->packet_context_field);
- if (ret) {
- msg = NULL;
- goto end;
- }
-
- msg_it->packet_context_field = NULL;
-
- /*
- * At this point msg_it->dscopes.stream_packet_context
- * has the same value as the packet context field within
- * msg_it->packet.
- */
- BT_ASSERT(bt_packet_borrow_context_field(
- msg_it->packet) ==
- msg_it->dscopes.stream_packet_context);
- }
-
BT_ASSERT(msg_it->self_msg_iter);
if (msg_it->meta.sc->packets_have_ts_begin) {
BT_ASSERT(tc);
BT_ASSERT(medops.request_bytes);
BT_ASSERT(medops.borrow_stream);
+ BT_ASSERT(max_request_sz > 0);
+
BT_COMP_LOG_CUR_LVL(BT_LOG_DEBUG, log_level, self_comp,
"Creating CTF plugin message iterator: "
"trace-addr=%p, max-request-size=%zu, "
BT_ASSERT_DBG(msg_it);
BT_ASSERT_DBG(message);
- msg_it->set_stream = true;
BT_COMP_LOGD("Getting next message: msg-it-addr=%p", msg_it);
while (true) {
goto end;
case STATE_EMIT_MSG_PACKET_BEGINNING:
- status = set_current_packet(msg_it);
- if (status != CTF_MSG_ITER_STATUS_OK) {
- goto end;
- }
-
if (G_UNLIKELY(msg_it->meta.tc->quirks.barectf_event_before_packet)) {
msg_it->emit_delayed_packet_beginning_msg = true;
/*
case STATE_EMIT_MSG_STREAM_BEGINNING:
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
+ msg_it->emit_stream_beginning_message = false;
+ msg_it->emit_stream_end_message = true;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
case STATE_EMIT_MSG_STREAM_END:
/* create_msg_stream_end() logs errors */
*message = create_msg_stream_end(msg_it);
+ msg_it->emit_stream_end_message = false;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
bt_self_component *self_comp = msg_it->self_comp;
BT_ASSERT_DBG(msg_it);
- msg_it->set_stream = false;
do {
/*
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
case STATE_AFTER_STREAM_PACKET_CONTEXT:
- case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING:
case STATE_EMIT_MSG_STREAM_BEGINNING:
case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_PACKET_END_MULTI:
case STATE_EMIT_MSG_PACKET_END_SINGLE:
case STATE_EMIT_QUEUED_MSG_PACKET_END:
- case STATE_CHECK_EMIT_MSG_STREAM_END:
case STATE_EMIT_MSG_STREAM_END:
break;
case STATE_DONE:
return status;
}
-BT_HIDDEN
-void ctf_msg_iter_set_medops_data(struct ctf_msg_iter *msg_it,
- void *medops_data)
-{
- BT_ASSERT(msg_it);
- msg_it->medium.data = medops_data;
-}
-
BT_HIDDEN
enum ctf_msg_iter_status ctf_msg_iter_seek(struct ctf_msg_iter *msg_it,
off_t offset)
BT_ASSERT(msg_it);
BT_ASSERT(offset >= 0);
-
- if (!msg_it->medium.medops.seek) {
- status = CTF_MSG_ITER_STATUS_UNSUPPORTED;
- BT_COMP_LOGD("Aborting seek as the iterator's underlying media does not implement seek support.");
- goto end;
- }
+ BT_ASSERT(msg_it->medium.medops.seek);
medium_status = msg_it->medium.medops.seek(offset, msg_it->medium.data);
if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
return status;
}
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_beginning_message(struct ctf_msg_iter *msg_it,
- bool val)
-{
- msg_it->emit_stream_begin_msg = val;
-}
-
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_end_message(struct ctf_msg_iter *msg_it,
- bool val)
-{
- msg_it->emit_stream_end_msg = val;
-}
-
BT_HIDDEN
void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it,
bool val)