These functions are now unnecessary. Before the previous patch
"src.ctf.fs: add and use medops to iterate on a ds_file_group using the
index", the src.ctf.fs component used ctf_msg_iter in such a way that
the iterator would process complete data stream files one after the
other, and was reset between each file. When starting a data stream
file, the iterator needed to know whether it was the first data stream
file for a given stream, and therefore if it should emit the stream
beginning message. Conversely, it needed to know if the data stream
file was the last one for the stream, and therefore if it needed to emit
the stream end message.
With the new ctf_fs_ds_group_medops, the ctf_msg_iter reads entire
streams, even if they are spread over multiple data stream files. We
therefore don't need to say whether a particular iterator run is the
beginning or end of a stream, it always is.
With lttng-live, the iterator also always does a single run so always
needs to emit the stream messages.
This patch also removes the related fields in ctf_msg_iter, as well as
the related states in the state machine.
The only non-obvious thing is: what to do with the call to
set_current_stream in check_emit_msg_stream_beginning_state. This is
the moment where the message iterator asks the medium to provide the
bt_stream instance for the given stream class and stream instance id.
For now, I moved it to just before we need it the first time, which is
when sending the stream beginning message. This is probably temporary
anyway, as it is planned that we will be able to just pass the bt_stream
when creating the ctf_msg_iter.
Change-Id: I275d4631ff11612abb46c73312bf133753ae4971
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
STATE_AFTER_STREAM_PACKET_CONTEXT,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
STATE_AFTER_STREAM_PACKET_CONTEXT,
- STATE_CHECK_EMIT_MSG_STREAM_BEGINNING,
STATE_EMIT_MSG_STREAM_BEGINNING,
STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS,
STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS,
STATE_EMIT_MSG_STREAM_BEGINNING,
STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS,
STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS,
STATE_SKIP_PACKET_PADDING,
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
STATE_SKIP_PACKET_PADDING,
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
- STATE_CHECK_EMIT_MSG_STREAM_END,
STATE_EMIT_QUEUED_MSG_PACKET_END,
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
STATE_EMIT_QUEUED_MSG_PACKET_END,
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
/* Current message iterator to create messages (weak) */
bt_self_message_iterator *self_msg_iter;
/* Current message iterator to create messages (weak) */
bt_self_message_iterator *self_msg_iter;
- /* True to emit a stream beginning message. */
- bool emit_stream_begin_msg;
-
- /* True to emit a stream end message. */
- bool emit_stream_end_msg;
-
/*
* True if library objects are unavailable during the decoding and
* should not be created/used.
/*
* True if library objects are unavailable during the decoding and
* should not be created/used.
medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/* No more packets. */
medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/* No more packets. */
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
+ msg_it->state = STATE_EMIT_MSG_STREAM_END;
status = CTF_MSG_ITER_STATUS_OK;
goto end;
} else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
status = CTF_MSG_ITER_STATUS_OK;
goto end;
} else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
break;
case CTF_MSG_ITER_STATUS_EOF:
status = CTF_MSG_ITER_STATUS_OK;
break;
case CTF_MSG_ITER_STATUS_EOF:
status = CTF_MSG_ITER_STATUS_OK;
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
+ msg_it->state = STATE_EMIT_MSG_STREAM_END;
goto end;
default:
goto end;
goto end;
default:
goto end;
*/
msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
} else {
*/
msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
} else {
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING;
+ msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING;
-static
-enum ctf_msg_iter_status check_emit_msg_stream_beginning_state(
- struct ctf_msg_iter *msg_it)
-{
- enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK;
-
- if (msg_it->set_stream) {
- status = set_current_stream(msg_it);
- if (status != CTF_MSG_ITER_STATUS_OK) {
- goto end;
- }
- }
-
- if (msg_it->emit_stream_begin_msg) {
- msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING;
- } else {
- /* Stream's first packet */
- msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
- }
-
-end:
- return status;
-}
-
static
enum ctf_msg_iter_status check_emit_msg_discarded_events(
struct ctf_msg_iter *msg_it)
static
enum ctf_msg_iter_status check_emit_msg_discarded_events(
struct ctf_msg_iter *msg_it)
return CTF_MSG_ITER_STATUS_OK;
}
return CTF_MSG_ITER_STATUS_OK;
}
-static
-enum ctf_msg_iter_status check_emit_msg_stream_end(
- struct ctf_msg_iter *msg_it)
-{
- if (msg_it->emit_stream_end_msg) {
- msg_it->state = STATE_EMIT_MSG_STREAM_END;
- } else {
- msg_it->state = STATE_DONE;
- }
-
- return CTF_MSG_ITER_STATUS_OK;
-}
-
static inline
enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
{
static inline
enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
{
case STATE_AFTER_STREAM_PACKET_CONTEXT:
status = after_packet_context_state(msg_it);
break;
case STATE_AFTER_STREAM_PACKET_CONTEXT:
status = after_packet_context_state(msg_it);
break;
- case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING:
- status = check_emit_msg_stream_beginning_state(msg_it);
- break;
case STATE_EMIT_MSG_STREAM_BEGINNING:
msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
break;
case STATE_EMIT_MSG_STREAM_BEGINNING:
msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS;
break;
msg_it->state = STATE_SKIP_PACKET_PADDING;
break;
case STATE_EMIT_MSG_PACKET_END_SINGLE:
msg_it->state = STATE_SKIP_PACKET_PADDING;
break;
case STATE_EMIT_MSG_PACKET_END_SINGLE:
- msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
- break;
- case STATE_CHECK_EMIT_MSG_STREAM_END:
- status = check_emit_msg_stream_end(msg_it);
+ msg_it->state = STATE_EMIT_MSG_STREAM_END;
break;
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
break;
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
goto end;
case STATE_EMIT_MSG_STREAM_BEGINNING:
goto end;
case STATE_EMIT_MSG_STREAM_BEGINNING:
+ BT_ASSERT(!msg_it->stream);
+ status = set_current_stream(msg_it);
+ if (status != CTF_MSG_ITER_STATUS_OK) {
+ goto end;
+ }
+
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
case STATE_AFTER_STREAM_PACKET_CONTEXT:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
case STATE_AFTER_STREAM_PACKET_CONTEXT:
- case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING:
case STATE_EMIT_MSG_STREAM_BEGINNING:
case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_STREAM_BEGINNING:
case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_DISCARDED_EVENTS:
case STATE_EMIT_MSG_PACKET_END_MULTI:
case STATE_EMIT_MSG_PACKET_END_SINGLE:
case STATE_EMIT_QUEUED_MSG_PACKET_END:
case STATE_EMIT_MSG_PACKET_END_MULTI:
case STATE_EMIT_MSG_PACKET_END_SINGLE:
case STATE_EMIT_QUEUED_MSG_PACKET_END:
- case STATE_CHECK_EMIT_MSG_STREAM_END:
case STATE_EMIT_MSG_STREAM_END:
break;
case STATE_DONE:
case STATE_EMIT_MSG_STREAM_END:
break;
case STATE_DONE:
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_beginning_message(struct ctf_msg_iter *msg_it,
- bool val)
-{
- msg_it->emit_stream_begin_msg = val;
-}
-
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_end_message(struct ctf_msg_iter *msg_it,
- bool val)
-{
- msg_it->emit_stream_end_msg = val;
-}
-
BT_HIDDEN
void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it,
bool val)
BT_HIDDEN
void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it,
bool val)
BT_HIDDEN
void ctf_msg_iter_reset_for_next_stream_file(struct ctf_msg_iter *msg_it);
BT_HIDDEN
void ctf_msg_iter_reset_for_next_stream_file(struct ctf_msg_iter *msg_it);
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_beginning_message(struct ctf_msg_iter *msg_it,
- bool val);
-
-BT_HIDDEN
-void ctf_msg_iter_set_emit_stream_end_message(struct ctf_msg_iter *msg_it,
- bool val);
-
BT_HIDDEN
void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it,
bool val);
BT_HIDDEN
void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it,
bool val);
- /* FIXME: This is temporary, those functions will be removed. */
- ctf_msg_iter_set_emit_stream_end_message(
- msg_iter_data->msg_iter, true);
- ctf_msg_iter_set_emit_stream_beginning_message(
- msg_iter_data->msg_iter, true);
-
/*
* This iterator can seek forward if its stream class has a default
* clock class.
/*
* This iterator can seek forward if its stream class has a default
* clock class.
"Failed to create CTF message iterator");
goto error;
}
"Failed to create CTF message iterator");
goto error;
}
-
- ctf_msg_iter_set_emit_stream_end_message(
- stream_iter->msg_iter, true);
- ctf_msg_iter_set_emit_stream_beginning_message(
- stream_iter->msg_iter, true);
"Failed to create CTF message iterator");
goto error;
}
"Failed to create CTF message iterator");
goto error;
}
-
- ctf_msg_iter_set_emit_stream_end_message(
- stream_iter->msg_iter, true);
- ctf_msg_iter_set_emit_stream_beginning_message(
- stream_iter->msg_iter, true);
}
stream_iter->buf = g_new0(uint8_t, lttng_live->max_query_size);
if (!stream_iter->buf) {
}
stream_iter->buf = g_new0(uint8_t, lttng_live->max_query_size);
if (!stream_iter->buf) {