Cleanup: remove plugin-common.h
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
14f28187 6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
020bc26f
PP
31#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
32#include "logging.h"
33
14f28187
FD
34#include <glib.h>
35#include <inttypes.h>
36#include <unistd.h>
37
578e048b 38#include "common/assert.h"
3fadfbc0 39#include <babeltrace2/babeltrace.h>
578e048b 40#include "compat/compiler.h"
3fadfbc0 41#include <babeltrace2/types.h>
f3bc2010 42
7cdc2bab
MD
43#include "data-stream.h"
44#include "metadata.h"
14f28187 45#include "lttng-live.h"
7cdc2bab 46
14f28187
FD
47#define MAX_QUERY_SIZE (256*1024)
48#define URL_PARAM "url"
49#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
50#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
51#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
52#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 53
087bc060 54#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab 55
14f28187
FD
56static
57const char *print_live_iterator_status(enum lttng_live_iterator_status status)
58{
59 switch (status) {
60 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
61 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
62 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
63 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
64 case LTTNG_LIVE_ITERATOR_STATUS_END:
65 return "LTTNG_LIVE_ITERATOR_STATUS_END";
66 case LTTNG_LIVE_ITERATOR_STATUS_OK:
67 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
68 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
69 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
70 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
71 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
72 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
73 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
74 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
75 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
76 default:
77 abort();
78 }
79}
80
81static
82const char *print_state(struct lttng_live_stream_iterator *s)
7cdc2bab
MD
83{
84 switch (s->state) {
85 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
86 return "ACTIVE_NO_DATA";
87 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
88 return "QUIESCENT_NO_DATA";
89 case LTTNG_LIVE_STREAM_QUIESCENT:
90 return "QUIESCENT";
91 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
92 return "ACTIVE_DATA";
93 case LTTNG_LIVE_STREAM_EOF:
94 return "EOF";
95 default:
96 return "ERROR";
97 }
98}
7cdc2bab 99
14f28187
FD
100#define print_stream_state(live_stream_iter) \
101 do { \
102 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
103 ", curr_inact_ts %" PRId64, \
104 print_state(live_stream_iter), \
105 live_stream_iter->last_inactivity_ts, \
106 live_stream_iter->current_inactivity_ts); \
107 } while (0);
6f79a7cf
MD
108
109BT_HIDDEN
42521b69 110bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
6f79a7cf 111{
14f28187
FD
112 const bt_component *component;
113 bool ret;
6f79a7cf
MD
114
115 if (!lttng_live) {
14f28187
FD
116 ret = false;
117 goto end;
7cdc2bab 118 }
7cdc2bab 119
14f28187
FD
120 component = bt_component_source_as_component_const(
121 bt_self_component_source_as_component_source(
122 lttng_live->self_comp));
6f79a7cf 123
14f28187 124 ret = bt_component_graph_is_canceled(component);
4bf0e537 125
14f28187
FD
126end:
127 return ret;
7cdc2bab
MD
128}
129
130static
131struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
132 uint64_t trace_id)
d3e4dcd8 133{
14f28187
FD
134 uint64_t trace_idx;
135 struct lttng_live_trace *ret_trace = NULL;
7cdc2bab 136
14f28187
FD
137 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
138 struct lttng_live_trace *trace =
139 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 140 if (trace->id == trace_id) {
14f28187
FD
141 ret_trace = trace;
142 goto end;
7cdc2bab
MD
143 }
144 }
14f28187
FD
145
146end:
147 return ret_trace;
d3eb6e8f
PP
148}
149
7cdc2bab 150static
14f28187 151void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 152{
14f28187 153 BT_LOGD("Destroy lttng_live_trace");
7cdc2bab 154
14f28187
FD
155 BT_ASSERT(trace->stream_iterators);
156 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 157
14f28187
FD
158 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
159 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 160
7cdc2bab 161 lttng_live_metadata_fini(trace);
7cdc2bab
MD
162 g_free(trace);
163}
164
165static
166struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
167 uint64_t trace_id)
168{
169 struct lttng_live_trace *trace = NULL;
170
171 trace = g_new0(struct lttng_live_trace, 1);
172 if (!trace) {
173 goto error;
174 }
175 trace->session = session;
176 trace->id = trace_id;
14f28187
FD
177 trace->trace_class = NULL;
178 trace->trace = NULL;
179 trace->stream_iterators = g_ptr_array_new_with_free_func(
180 (GDestroyNotify) lttng_live_stream_iterator_destroy);
181 BT_ASSERT(trace->stream_iterators);
7cdc2bab 182 trace->new_metadata_needed = true;
14f28187
FD
183 g_ptr_array_add(session->traces, trace);
184
087bc060 185 BT_LOGI("Create trace");
7cdc2bab
MD
186 goto end;
187error:
188 g_free(trace);
189 trace = NULL;
190end:
191 return trace;
192}
193
194BT_HIDDEN
14f28187
FD
195struct lttng_live_trace *lttng_live_borrow_trace(
196 struct lttng_live_session *session, uint64_t trace_id)
7cdc2bab
MD
197{
198 struct lttng_live_trace *trace;
199
200 trace = lttng_live_find_trace(session, trace_id);
201 if (trace) {
14f28187 202 goto end;
7cdc2bab 203 }
7cdc2bab 204
14f28187
FD
205 /* The session is the owner of the newly created trace. */
206 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 207
14f28187
FD
208end:
209 return trace;
7cdc2bab
MD
210}
211
212BT_HIDDEN
14f28187 213int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
06994c71
MD
214 uint64_t session_id, const char *hostname,
215 const char *session_name)
7cdc2bab
MD
216{
217 int ret = 0;
14f28187 218 struct lttng_live_session *session;
7cdc2bab 219
14f28187
FD
220 session = g_new0(struct lttng_live_session, 1);
221 if (!session) {
7cdc2bab
MD
222 goto error;
223 }
224
14f28187
FD
225 session->id = session_id;
226 session->traces = g_ptr_array_new_with_free_func(
227 (GDestroyNotify) lttng_live_destroy_trace);
228 BT_ASSERT(session->traces);
229 session->lttng_live_msg_iter = lttng_live_msg_iter;
230 session->new_streams_needed = true;
231 session->hostname = g_string_new(hostname);
232 BT_ASSERT(session->hostname);
233
234 session->session_name = g_string_new(session_name);
235 BT_ASSERT(session->session_name);
7cdc2bab 236
06994c71 237 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
14f28187
FD
238 session->id, hostname, session_name);
239 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
7cdc2bab
MD
240 goto end;
241error:
087bc060 242 BT_LOGE("Error adding session");
14f28187 243 g_free(session);
7cdc2bab
MD
244 ret = -1;
245end:
246 return ret;
247}
248
249static
250void lttng_live_destroy_session(struct lttng_live_session *session)
251{
14f28187
FD
252 struct lttng_live_component *live_comp;
253
254 if (!session) {
255 goto end;
256 }
7cdc2bab 257
14f28187 258 BT_LOGD("Destroy lttng live session");
7cdc2bab
MD
259 if (session->id != -1ULL) {
260 if (lttng_live_detach_session(session)) {
14f28187
FD
261 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
262 if (session->lttng_live_msg_iter &&
42521b69 263 !lttng_live_graph_is_canceled(live_comp)) {
4c66436f 264 /* Old relayd cannot detach sessions. */
14f28187 265 BT_LOGD("Unable to detach lttng live session %" PRIu64,
4c66436f
MD
266 session->id);
267 }
7cdc2bab
MD
268 }
269 session->id = -1ULL;
270 }
14f28187
FD
271
272 if (session->traces) {
273 g_ptr_array_free(session->traces, TRUE);
7cdc2bab 274 }
14f28187 275
06994c71
MD
276 if (session->hostname) {
277 g_string_free(session->hostname, TRUE);
278 }
279 if (session->session_name) {
280 g_string_free(session->session_name, TRUE);
281 }
7cdc2bab 282 g_free(session);
14f28187
FD
283
284end:
285 return;
7cdc2bab
MD
286}
287
14f28187
FD
288static
289void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 290{
14f28187
FD
291 if (!lttng_live_msg_iter) {
292 goto end;
7cdc2bab 293 }
7cdc2bab 294
14f28187
FD
295 if (lttng_live_msg_iter->sessions) {
296 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
7cdc2bab 297 }
14f28187
FD
298
299 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
300 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
301 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
302
303 /* All stream iterators must be destroyed at this point. */
304 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
305 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
306
307 g_free(lttng_live_msg_iter);
308
309end:
310 return;
311}
312
313BT_HIDDEN
314void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
315{
316 struct lttng_live_msg_iter *lttng_live_msg_iter;
317
318 BT_ASSERT(self_msg_iter);
319
320 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
321 BT_ASSERT(lttng_live_msg_iter);
322 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
323}
324
325static
14f28187 326enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
7cdc2bab
MD
327 struct lttng_live_stream_iterator *lttng_live_stream)
328{
329 switch (lttng_live_stream->state) {
330 case LTTNG_LIVE_STREAM_QUIESCENT:
331 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
332 break;
333 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
334 /* Invalid state. */
087bc060
MD
335 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
336 abort();
7cdc2bab
MD
337 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
338 /* Invalid state. */
087bc060
MD
339 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
340 abort();
7cdc2bab
MD
341 case LTTNG_LIVE_STREAM_EOF:
342 break;
343 }
14f28187 344 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
345}
346
347/*
348 * For active no data stream, fetch next data. It can be either:
349 * - quiescent: need to put it in the prio heap at quiescent end
350 * timestamp,
351 * - have data: need to wire up first event into the prio heap,
352 * - have no data on this stream at this point: need to retry (AGAIN) or
353 * return EOF.
354 */
355static
14f28187
FD
356enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
357 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
358 struct lttng_live_stream_iterator *lttng_live_stream)
359{
14f28187
FD
360 enum lttng_live_iterator_status ret =
361 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
362 struct packet_index index;
363 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
364
365 if (lttng_live_stream->trace->new_metadata_needed) {
14f28187 366 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
367 goto end;
368 }
369 if (lttng_live_stream->trace->session->new_streams_needed) {
14f28187 370 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
371 goto end;
372 }
14f28187
FD
373 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
374 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
7cdc2bab
MD
375 goto end;
376 }
14f28187
FD
377 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
378 &index);
379 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
380 goto end;
381 }
f6ccaed9 382 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
7cdc2bab 383 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187
FD
384 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
385 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
386
387 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
388 last_inact_ts == curr_inact_ts) {
389 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
390 print_stream_state(lttng_live_stream);
391 } else {
14f28187 392 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
393 }
394 goto end;
395 }
396 lttng_live_stream->base_offset = index.offset;
397 lttng_live_stream->offset = index.offset;
398 lttng_live_stream->len = index.packet_size / CHAR_BIT;
399end:
14f28187
FD
400 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
401 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
7cdc2bab
MD
402 }
403 return ret;
404}
405
406/*
14f28187
FD
407 * Creation of the message requires the ctf trace class to be created
408 * beforehand, but the live protocol gives us all streams (including metadata)
409 * at once. So we split it in three steps: getting streams, getting metadata
410 * (which creates the ctf trace class), and then creating the per-stream
411 * messages.
7cdc2bab
MD
412 */
413static
14f28187
FD
414enum lttng_live_iterator_status lttng_live_get_session(
415 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
416 struct lttng_live_session *session)
417{
14f28187
FD
418 enum lttng_live_iterator_status status;
419 uint64_t trace_idx;
420 int ret = 0;
7cdc2bab 421
14f28187
FD
422 if (!session->attached) {
423 ret = lttng_live_attach_session(session);
424 if (ret) {
42521b69 425 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
14f28187
FD
426 lttng_live_msg_iter->lttng_live_comp)) {
427 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
428 } else {
429 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
430 }
431 goto end;
4c66436f 432 }
7cdc2bab 433 }
14f28187 434
7cdc2bab 435 status = lttng_live_get_new_streams(session);
14f28187
FD
436 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
437 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
438 goto end;
7cdc2bab 439 }
14f28187
FD
440 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
441 struct lttng_live_trace *trace =
442 g_ptr_array_index(session->traces, trace_idx);
443
7cdc2bab 444 status = lttng_live_metadata_update(trace);
14f28187
FD
445 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
446 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
447 goto end;
7cdc2bab
MD
448 }
449 }
14f28187
FD
450 status = lttng_live_lazy_msg_init(session);
451
452end:
453 return status;
7cdc2bab
MD
454}
455
456BT_HIDDEN
14f28187 457void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 458{
14f28187 459 uint64_t session_idx;
7cdc2bab 460
14f28187
FD
461 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
462 session_idx++) {
463 struct lttng_live_session *session =
464 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
465 session->new_streams_needed = true;
466 }
467}
468
469static
14f28187 470void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 471{
14f28187 472 uint64_t session_idx, trace_idx;
7cdc2bab 473
14f28187
FD
474 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
475 session_idx++) {
476 struct lttng_live_session *session =
477 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab 478 session->new_streams_needed = true;
14f28187
FD
479 for (trace_idx = 0; trace_idx < session->traces->len;
480 trace_idx++) {
481 struct lttng_live_trace *trace =
482 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab
MD
483 trace->new_metadata_needed = true;
484 }
485 }
486}
487
488static
14f28187
FD
489enum lttng_live_iterator_status
490lttng_live_iterator_handle_new_streams_and_metadata(
491 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 492{
14f28187
FD
493 enum lttng_live_iterator_status ret =
494 LTTNG_LIVE_ITERATOR_STATUS_OK;
495 uint64_t session_idx = 0, nr_sessions_opened = 0;
496 struct lttng_live_session *session;
497 enum session_not_found_action sess_not_found_act =
498 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
499
7cdc2bab 500 /*
14f28187 501 * In a remotely distant future, we could add a "new
7cdc2bab
MD
502 * session" flag to the protocol, which would tell us that we
503 * need to query for new sessions even though we have sessions
504 * currently ongoing.
505 */
14f28187
FD
506 if (lttng_live_msg_iter->sessions->len == 0) {
507 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
508 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
509 goto end;
510 } else {
511 /*
512 * Retry to create a viewer session for the requested
513 * session name.
514 */
515 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
516 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
517 goto end;
518 }
519 }
7cdc2bab 520 }
14f28187
FD
521
522 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
523 session_idx++) {
524 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
525 session_idx);
526 ret = lttng_live_get_session(lttng_live_msg_iter, session);
7cdc2bab 527 switch (ret) {
14f28187 528 case LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab 529 break;
14f28187
FD
530 case LTTNG_LIVE_ITERATOR_STATUS_END:
531 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
532 break;
533 default:
534 goto end;
535 }
536 if (!session->closed) {
537 nr_sessions_opened++;
538 }
539 }
540end:
14f28187
FD
541 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
542 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
543 nr_sessions_opened == 0) {
544 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
545 }
546 return ret;
547}
548
549static
14f28187
FD
550enum lttng_live_iterator_status emit_inactivity_message(
551 struct lttng_live_msg_iter *lttng_live_msg_iter,
552 struct lttng_live_stream_iterator *stream_iter,
553 bt_message **message, uint64_t timestamp)
7cdc2bab 554{
14f28187
FD
555 enum lttng_live_iterator_status ret =
556 LTTNG_LIVE_ITERATOR_STATUS_OK;
557 bt_message *msg = NULL;
7cdc2bab 558
14f28187
FD
559 BT_ASSERT(stream_iter->trace->clock_class);
560
561 msg = bt_message_message_iterator_inactivity_create(
562 lttng_live_msg_iter->self_msg_iter,
563 stream_iter->trace->clock_class,
564 timestamp);
d6e69534 565 if (!msg) {
7cdc2bab
MD
566 goto error;
567 }
14f28187 568
d6e69534 569 *message = msg;
7cdc2bab 570end:
7cdc2bab
MD
571 return ret;
572
573error:
14f28187 574 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
d6e69534 575 bt_message_put_ref(msg);
7cdc2bab
MD
576 goto end;
577}
578
579static
14f28187
FD
580enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
581 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 582 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 583 bt_message **message)
7cdc2bab 584{
14f28187
FD
585 enum lttng_live_iterator_status ret =
586 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
587
588 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187 589 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
590 }
591
14f28187
FD
592 if (lttng_live_stream->current_inactivity_ts ==
593 lttng_live_stream->last_inactivity_ts) {
7cdc2bab 594 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
14f28187
FD
595 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
596 goto end;
597 }
598
599 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
600 message, lttng_live_stream->current_inactivity_ts);
601
602 lttng_live_stream->last_inactivity_ts =
603 lttng_live_stream->current_inactivity_ts;
604end:
605 return ret;
606}
607
608static
609int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
610 struct lttng_live_msg_iter *lttng_live_msg_iter,
611 const bt_message *msg, int64_t last_msg_ts_ns,
612 int64_t *ts_ns)
613{
614 const bt_clock_class *clock_class = NULL;
615 const bt_clock_snapshot *clock_snapshot = NULL;
616 int ret = 0;
14f28187
FD
617 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
618
619 BT_ASSERT(msg);
620 BT_ASSERT(ts_ns);
621
622 BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
623 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
624 last_msg_ts_ns);
625
626 switch (bt_message_get_type(msg)) {
627 case BT_MESSAGE_TYPE_EVENT:
628 clock_class =
629 bt_message_event_borrow_stream_class_default_clock_class_const(
630 msg);
631 BT_ASSERT(clock_class);
632
0cbc2c33
PP
633 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
634 msg);
14f28187
FD
635 break;
636 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
637 clock_class =
638 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
639 msg);
640 BT_ASSERT(clock_class);
641
0cbc2c33
PP
642 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
643 msg);
14f28187
FD
644 break;
645 case BT_MESSAGE_TYPE_PACKET_END:
646 clock_class =
647 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
648 msg);
649 BT_ASSERT(clock_class);
650
0cbc2c33
PP
651 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
652 msg);
14f28187
FD
653 break;
654 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
655 clock_class =
656 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
657 msg);
658 BT_ASSERT(clock_class);
659
9b24b6aa 660 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 661 msg);
14f28187
FD
662 break;
663 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
664 clock_class =
665 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
666 msg);
667 BT_ASSERT(clock_class);
668
9b24b6aa 669 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 670 msg);
14f28187
FD
671 break;
672 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
673 clock_class =
674 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
675 msg);
676 BT_ASSERT(clock_class);
677
678 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
679 msg, &clock_snapshot);
680 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
681 goto no_clock_snapshot;
682 }
683
684 break;
685 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
686 clock_class =
687 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
688 msg);
689 BT_ASSERT(clock_class);
690
691 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
692 msg, &clock_snapshot);
693 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
694 goto no_clock_snapshot;
695 }
696
697 break;
698 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
0cbc2c33 699 clock_snapshot =
14f28187 700 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
0cbc2c33 701 msg);
14f28187
FD
702 break;
703 default:
704 /* All the other messages have a higher priority */
705 BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
706 *ts_ns = last_msg_ts_ns;
7cdc2bab
MD
707 goto end;
708 }
709
14f28187
FD
710 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
711 BT_ASSERT(clock_class);
712
713 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
714 if (ret) {
715 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
716 "clock-snapshot-addr=%p", clock_snapshot);
717 goto error;
718 }
719
720 goto end;
721
722no_clock_snapshot:
723 BT_LOGV_STR("Message's default clock snapshot is missing: "
724 "using the last message timestamp.");
725 *ts_ns = last_msg_ts_ns;
726 goto end;
727
728error:
729 ret = -1;
7cdc2bab 730
7cdc2bab 731end:
14f28187
FD
732 if (ret == 0) {
733 BT_LOGV("Found message's timestamp: "
734 "iter-data-addr=%p, msg-addr=%p, "
735 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
736 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
737 }
738
7cdc2bab
MD
739 return ret;
740}
741
742static
14f28187
FD
743enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
744 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 745 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 746 bt_message **message)
7cdc2bab 747{
14f28187 748 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
d6e69534 749 enum bt_msg_iter_status status;
14f28187 750 uint64_t session_idx, trace_idx;
7cdc2bab 751
14f28187
FD
752 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
753 session_idx++) {
754 struct lttng_live_session *session =
755 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
756
757 if (session->new_streams_needed) {
14f28187
FD
758 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
759 goto end;
7cdc2bab 760 }
14f28187
FD
761 for (trace_idx = 0; trace_idx < session->traces->len;
762 trace_idx++) {
763 struct lttng_live_trace *trace =
764 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 765 if (trace->new_metadata_needed) {
14f28187
FD
766 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
767 goto end;
7cdc2bab
MD
768 }
769 }
770 }
771
772 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
14f28187
FD
773 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
774 goto end;
7cdc2bab 775 }
14f28187
FD
776
777 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
778 lttng_live_msg_iter->self_msg_iter, message);
7cdc2bab 779 switch (status) {
d6e69534 780 case BT_MSG_ITER_STATUS_EOF:
14f28187 781 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 782 break;
d6e69534 783 case BT_MSG_ITER_STATUS_OK:
14f28187 784 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 785 break;
d6e69534 786 case BT_MSG_ITER_STATUS_AGAIN:
7cdc2bab
MD
787 /*
788 * Continue immediately (end of packet). The next
789 * get_index may return AGAIN to delay the following
790 * attempt.
791 */
14f28187 792 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab 793 break;
d6e69534 794 case BT_MSG_ITER_STATUS_INVAL:
7cdc2bab 795 /* No argument provided by the user, so don't return INVAL. */
d6e69534 796 case BT_MSG_ITER_STATUS_ERROR:
7cdc2bab 797 default:
14f28187
FD
798 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
799 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
800 lttng_live_stream->msg_iter);
7cdc2bab
MD
801 break;
802 }
14f28187
FD
803
804end:
7cdc2bab
MD
805 return ret;
806}
807
808/*
809 * helper function:
810 * handle_no_data_streams()
811 * retry:
812 * - for each ACTIVE_NO_DATA stream:
813 * - query relayd for stream data, or quiescence info.
814 * - if need metadata, get metadata, goto retry.
815 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
816 * - if quiescent, move to QUIESCENT streams
817 * - if fetched data, move to ACTIVE_DATA streams
818 * (at this point each stream either has data, or is quiescent)
819 *
820 *
821 * iterator_next:
822 * handle_new_streams_and_metadata()
823 * - query relayd for known streams, add them as ACTIVE_NO_DATA
824 * - query relayd for metadata
825 *
826 * call handle_active_no_data_streams()
827 *
828 * handle_quiescent_streams()
829 * - if at least one stream is ACTIVE_DATA:
830 * - peek stream event with lowest timestamp -> next_ts
831 * - for each quiescent stream
832 * - if next_ts >= quiescent end
833 * - set state to ACTIVE_NO_DATA
834 * - else
835 * - for each quiescent stream
836 * - set state to ACTIVE_NO_DATA
837 *
838 * call handle_active_no_data_streams()
839 *
840 * handle_active_data_streams()
841 * - if at least one stream is ACTIVE_DATA:
842 * - get stream event with lowest timestamp from heap
d6e69534 843 * - make that stream event the current message.
7cdc2bab
MD
844 * - move this stream heap position to its next event
845 * - if we need to fetch data from relayd, move
846 * stream to ACTIVE_NO_DATA.
847 * - return OK
848 * - return AGAIN
849 *
850 * end criterion: ctrl-c on client. If relayd exits or the session
851 * closes on the relay daemon side, we keep on waiting for streams.
852 * Eventually handle --end timestamp (also an end criterion).
853 *
854 * When disconnected from relayd: try to re-connect endlessly.
855 */
856static
14f28187
FD
857enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
858 struct lttng_live_msg_iter *lttng_live_msg_iter,
859 struct lttng_live_stream_iterator *stream_iter,
860 bt_message **curr_msg)
7cdc2bab 861{
14f28187 862 enum lttng_live_iterator_status live_status;
7cdc2bab 863
7cdc2bab
MD
864retry:
865 print_stream_state(stream_iter);
14f28187
FD
866 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
867 lttng_live_msg_iter);
868 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
869 goto end;
870 }
14f28187
FD
871 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
872 lttng_live_msg_iter, stream_iter);
873 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
874 goto end;
875 }
14f28187
FD
876 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
877 lttng_live_msg_iter, stream_iter, curr_msg);
878 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
879 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
880 goto end;
881 }
14f28187 882 if (*curr_msg) {
7cdc2bab
MD
883 goto end;
884 }
14f28187
FD
885 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
886 lttng_live_msg_iter, stream_iter, curr_msg);
887 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
888 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
889 }
890
891end:
14f28187 892 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
7cdc2bab 893 goto retry;
7cdc2bab 894 }
14f28187
FD
895
896 return live_status;
7cdc2bab
MD
897}
898
899static
14f28187
FD
900enum lttng_live_iterator_status next_stream_iterator_for_trace(
901 struct lttng_live_msg_iter *lttng_live_msg_iter,
902 struct lttng_live_trace *live_trace,
903 struct lttng_live_stream_iterator **candidate_stream_iter)
7cdc2bab 904{
14f28187
FD
905 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
906 enum lttng_live_iterator_status stream_iter_status;;
907 int64_t curr_candidate_msg_ts = INT64_MAX;
908 uint64_t stream_iter_idx;
7cdc2bab 909
14f28187
FD
910 BT_ASSERT(live_trace);
911 BT_ASSERT(live_trace->stream_iterators);
912 /*
913 * Update the current message of every stream iterators of this trace.
914 * The current msg of every stream must have a timestamp equal or
915 * larger than the last message returned by this iterator. We must
916 * ensure monotonicity.
917 */
918 stream_iter_idx = 0;
919 while (stream_iter_idx < live_trace->stream_iterators->len) {
920 bool stream_iter_is_ended = false;
921 struct lttng_live_stream_iterator *stream_iter =
922 g_ptr_array_index(live_trace->stream_iterators,
923 stream_iter_idx);
924
925 /*
926 * Find if there is are now current message for this stream
927 * iterator get it.
928 */
929 while (!stream_iter->current_msg) {
930 bt_message *msg = NULL;
931 int64_t curr_msg_ts_ns = INT64_MAX;
932 stream_iter_status = lttng_live_iterator_next_on_stream(
933 lttng_live_msg_iter, stream_iter, &msg);
934
935 BT_LOGD("live stream iterator returned status :%s",
936 print_live_iterator_status(stream_iter_status));
937 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
938 stream_iter_is_ended = true;
939 break;
940 }
941
942 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
943 goto end;
944 }
945
946 BT_ASSERT(msg);
947
948 /*
949 * Get the timestamp in nanoseconds from origin of this
950 * messsage.
951 */
952 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
953 msg, lttng_live_msg_iter->last_msg_ts_ns,
954 &curr_msg_ts_ns);
955
956 /*
957 * Check if the message of the current live stream
958 * iterator occured at the exact same time or after the
959 * last message returned by this component's message
960 * iterator. If not, we return an error.
961 */
962 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
963 stream_iter->current_msg = msg;
964 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
965 } else {
966 /*
967 * We received a message in the past. To ensure
968 * monotonicity, we can't send it forward.
969 */
970 BT_LOGE("Message's timestamp is less than "
971 "lttng-live's message iterator's last "
972 "returned timestamp: "
973 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
974 "last-msg-ts=%" PRId64,
975 lttng_live_msg_iter, curr_msg_ts_ns,
976 lttng_live_msg_iter->last_msg_ts_ns);
977 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
978 goto end;
979 }
980 }
981
982 if (!stream_iter_is_ended &&
983 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
984 /*
985 * Update the current best candidate message for the
986 * stream iterator of thise live trace to be forwarded
987 * downstream.
988 */
989 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
990 curr_candidate_stream_iter = stream_iter;
991 }
992
993 if (stream_iter_is_ended) {
994 /*
995 * The live stream iterator is ENDed. We remove that
996 * iterator from the list and we restart the iteration
997 * at the beginning of the live stream iterator array
998 * to because the removal will shuffle the array.
999 */
1000 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1001 stream_iter_idx);
1002 stream_iter_idx = 0;
1003 } else {
1004 stream_iter_idx++;
1005 }
1006 }
1007
1008 if (curr_candidate_stream_iter) {
1009 *candidate_stream_iter = curr_candidate_stream_iter;
1010 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1011 } else {
1012 /*
1013 * The only case where we don't have a candidate for this trace
1014 * is if we reached the end of all the iterators.
1015 */
1016 BT_ASSERT(live_trace->stream_iterators->len == 0);
1017 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1018 }
1019
1020end:
1021 return stream_iter_status;
1022}
1023
1024static
1025enum lttng_live_iterator_status next_stream_iterator_for_session(
1026 struct lttng_live_msg_iter *lttng_live_msg_iter,
1027 struct lttng_live_session *session,
1028 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1029{
1030 enum lttng_live_iterator_status stream_iter_status;
1031 uint64_t trace_idx = 0;
1032 int64_t curr_candidate_msg_ts = INT64_MAX;
1033 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1034
1035 /*
1036 * Make sure we are attached to the session and look for new streams
1037 * and metadata.
1038 */
1039 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1040 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1041 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1042 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
1043 goto end;
1044 }
14f28187
FD
1045
1046 BT_ASSERT(session->traces);
1047
1048 /*
1049 * Use while loops here rather then for loops so we can restart the
1050 * iteration if an element is removed from the array during the
1051 * looping.
1052 */
1053 while (trace_idx < session->traces->len) {
1054 bool trace_is_ended = false;
1055 struct lttng_live_stream_iterator *stream_iter;
1056 struct lttng_live_trace *trace =
1057 g_ptr_array_index(session->traces, trace_idx);
1058
1059 stream_iter_status = next_stream_iterator_for_trace(
1060 lttng_live_msg_iter, trace, &stream_iter);
1061 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1062 /*
1063 * All the live stream iterators for this trace are
1064 * ENDed. Remove the trace from this session.
1065 */
1066 trace_is_ended = true;
1067 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1068 goto end;
1069 }
1070
1071 if (!trace_is_ended) {
1072 BT_ASSERT(stream_iter);
1073
1074 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1075 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1076 curr_candidate_stream_iter = stream_iter;
1077 }
1078 trace_idx++;
1079 } else {
1080 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1081 trace_idx = 0;
1082 }
1083 }
1084 if (curr_candidate_stream_iter) {
1085 *candidate_session_stream_iter = curr_candidate_stream_iter;
1086 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 1087 } else {
14f28187
FD
1088 /*
1089 * The only cases where we don't have a candidate for this
1090 * trace is:
1091 * 1. if we reached the end of all the iterators of all the
1092 * traces of this session,
1093 * 2. if we never had live stream iterator in the first place.
1094 *
1095 * In either cases, we return END.
1096 */
1097 BT_ASSERT(session->traces->len == 0);
1098 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1099 }
1100end:
14f28187
FD
1101 return stream_iter_status;
1102}
1103
1104static inline
1105void put_messages(bt_message_array_const msgs, uint64_t count)
1106{
1107 uint64_t i;
1108
1109 for (i = 0; i < count; i++) {
1110 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
7cdc2bab 1111 }
7cdc2bab
MD
1112}
1113
d3eb6e8f 1114BT_HIDDEN
14f28187
FD
1115bt_self_message_iterator_status lttng_live_msg_iter_next(
1116 bt_self_message_iterator *self_msg_it,
1117 bt_message_array_const msgs, uint64_t capacity,
1118 uint64_t *count)
d3eb6e8f 1119{
14f28187
FD
1120 bt_self_message_iterator_status status;
1121 struct lttng_live_msg_iter *lttng_live_msg_iter =
1122 bt_self_message_iterator_get_data(self_msg_it);
1123 struct lttng_live_component *lttng_live =
1124 lttng_live_msg_iter->lttng_live_comp;
1125 enum lttng_live_iterator_status stream_iter_status;
1126 uint64_t session_idx;
1127
1128 *count = 0;
1129
1130 BT_ASSERT(lttng_live_msg_iter);
1131
1132 /*
1133 * Clear all the invalid message reference that might be left over in
1134 * the output array.
1135 */
1136 memset(msgs, 0, capacity * sizeof(*msgs));
1137
1138 /*
1139 * If no session are exposed on the relay found at the url provided by
1140 * the user, session count will be 0. In this case, we return status
1141 * end to return gracefully.
1142 */
1143 if (lttng_live_msg_iter->sessions->len == 0) {
1144 if (lttng_live->params.sess_not_found_act !=
1145 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1146 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1147 goto no_session;
1148 } else {
1149 /*
1150 * The are no more active session for this session
1151 * name. Retry to create a viewer session for the
1152 * requested session name.
1153 */
1154 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1155 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1156 goto no_session;
1157 }
1158 }
1159 }
1160
1161 if (lttng_live_msg_iter->active_stream_iter == 0) {
1162 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1163 }
1164
1165 /*
1166 * Here the muxing of message is done.
1167 *
1168 * We need to iterate over all the streams of all the traces of all the
1169 * viewer sessions in order to get the message with the smallest
1170 * timestamp. In this case, a session is a viewer session and there is
1171 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1172 * kernel). Each viewer session can have multiple traces, for example,
1173 * 64bit UST viewer sessions could have multiple per-pid traces.
1174 *
1175 * We iterate over the streams of each traces to update and see what is
1176 * their next message's timestamp. From those timestamps, we select the
1177 * message with the smallest timestamp as the best candidate message
1178 * for that trace and do the same thing across all the sessions.
1179 *
1180 * We then compare the timestamp of best candidate message of all the
1181 * sessions to pick the message with the smallest timestamp and we
1182 * return it.
1183 */
1184 while (*count < capacity) {
1185 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1186 *candidate_stream_iter = NULL;
1187 int64_t next_msg_ts_ns = INT64_MAX;
1188
1189 BT_ASSERT(lttng_live_msg_iter->sessions);
1190 session_idx = 0;
1191 /*
1192 * Use a while loop instead of a for loop so we can restart the
1193 * iteration if we remove an element. We can safely call
1194 * next_stream_iterator_for_session() multiple times on the
1195 * same session as we only fetch a new message if there is no
1196 * current next message for each live stream iterator.
1197 * If all live stream iterator of that session already have a
1198 * current next message, the function will simply exit return
1199 * the same candidate live stream iterator every time.
1200 */
1201 while (session_idx < lttng_live_msg_iter->sessions->len) {
1202 struct lttng_live_session *session =
1203 g_ptr_array_index(lttng_live_msg_iter->sessions,
1204 session_idx);
1205
1206 /* Find the best candidate message to send downstream. */
1207 stream_iter_status = next_stream_iterator_for_session(
1208 lttng_live_msg_iter, session,
1209 &candidate_stream_iter);
1210
1211 /* If we receive an END status, it means that either:
1212 * - Those traces never had active streams (UST with no
1213 * data produced yet),
1214 * - All live stream iterators have ENDed.*/
1215 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1216 if (session->closed && session->traces->len == 0) {
1217 /*
1218 * Remove the session from the list and restart the
1219 * iteration at the beginning of the array since the
1220 * removal shuffle the elements of the array.
1221 */
1222 g_ptr_array_remove_index_fast(
1223 lttng_live_msg_iter->sessions,
1224 session_idx);
1225 session_idx = 0;
1226 } else {
1227 session_idx++;
1228 }
1229 continue;
1230 }
1231
1232 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1233 goto end;
1234 }
1235
1236 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1237 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1238 next_stream_iter = candidate_stream_iter;
1239 }
1240
1241 session_idx++;
1242 }
1243
1244 if (!next_stream_iter) {
1245 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1246 goto end;
1247 }
1248
1249 BT_ASSERT(next_stream_iter->current_msg);
1250 /* Ensure monotonicity. */
1251 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1252 next_stream_iter->current_msg_ts_ns);
1253
1254 /*
1255 * Insert the next message to the message batch. This will set
1256 * stream iterator current messsage to NULL so that next time
1257 * we fetch the next message of that stream iterator
1258 */
1259 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1260 (*count)++;
1261
1262 /* Update the last timestamp in nanoseconds sent downstream. */
1263 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1264 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1265
1266 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1267 }
1268end:
1269 switch (stream_iter_status) {
1270 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1271 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1272 if (*count > 0) {
1273 /*
1274 * We received a again status but we have some messages
1275 * to send downstream. We send them and return OK for
1276 * now. On the next call we return again if there are
1277 * still no new message to send.
1278 */
1279 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1280 } else {
1281 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1282 }
7cdc2bab 1283 break;
14f28187
FD
1284 case LTTNG_LIVE_ITERATOR_STATUS_END:
1285 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
7cdc2bab 1286 break;
14f28187
FD
1287 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1288 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1289 break;
1290 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1291 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1292 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1293 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1294 /* Put all existing messages on error. */
1295 put_messages(msgs, *count);
7cdc2bab 1296 break;
14f28187
FD
1297 default:
1298 abort();
7cdc2bab 1299 }
14f28187
FD
1300
1301no_session:
1302 return status;
7cdc2bab 1303}
41a2b7ae 1304
7cdc2bab 1305BT_HIDDEN
14f28187
FD
1306bt_self_message_iterator_status lttng_live_msg_iter_init(
1307 bt_self_message_iterator *self_msg_it,
1308 bt_self_component_source *self_comp_src,
1309 bt_self_component_port_output *self_port)
7cdc2bab 1310{
14f28187
FD
1311 bt_self_message_iterator_status ret =
1312 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1313 bt_self_component *self_comp =
1314 bt_self_component_source_as_self_component(self_comp_src);
1315 struct lttng_live_component *lttng_live;
1316 struct lttng_live_msg_iter *lttng_live_msg_iter;
1317
1318 BT_ASSERT(self_msg_it);
1319
1320 lttng_live = bt_self_component_get_data(self_comp);
1321
1322 /* There can be only one downstream iterator at the same time. */
1323 BT_ASSERT(!lttng_live->has_msg_iter);
1324 lttng_live->has_msg_iter = true;
1325
1326 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1327 if (!lttng_live_msg_iter) {
1328 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1329 goto end;
7cdc2bab 1330 }
14f28187
FD
1331
1332 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1333 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1334
1335 lttng_live_msg_iter->active_stream_iter = 0;
1336 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1337 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1338 (GDestroyNotify) lttng_live_destroy_session);
1339 BT_ASSERT(lttng_live_msg_iter->sessions);
1340
1341 lttng_live_msg_iter->viewer_connection =
1342 live_viewer_connection_create(lttng_live->params.url->str, false,
1343 lttng_live_msg_iter);
1344 if (!lttng_live_msg_iter->viewer_connection) {
1345 goto error;
1346 }
1347
1348 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1349 goto error;
1350 }
1351 if (lttng_live_msg_iter->sessions->len == 0) {
1352 switch (lttng_live->params.sess_not_found_act) {
1353 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1354 BT_LOGI("Unable to connect to the requested live viewer "
1355 "session. Keep trying to connect because of "
1356 "%s=\"%s\" component parameter: url=\"%s\"",
1357 SESS_NOT_FOUND_ACTION_PARAM,
1358 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1359 lttng_live->params.url->str);
1360 break;
1361 case SESSION_NOT_FOUND_ACTION_FAIL:
1362 BT_LOGE("Unable to connect to the requested live viewer "
1363 "session. Fail the message iterator"
1364 "initialization because of %s=\"%s\" "
1365 "component parameter: url =\"%s\"",
1366 SESS_NOT_FOUND_ACTION_PARAM,
1367 SESS_NOT_FOUND_ACTION_FAIL_STR,
1368 lttng_live->params.url->str);
7cdc2bab 1369 goto error;
14f28187
FD
1370 case SESSION_NOT_FOUND_ACTION_END:
1371 BT_LOGI("Unable to connect to the requested live viewer "
1372 "session. End gracefully at the first _next() "
1373 "call because of %s=\"%s\" component parameter: "
1374 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1375 SESS_NOT_FOUND_ACTION_END_STR,
1376 lttng_live->params.url->str);
1377 break;
7cdc2bab 1378 }
7cdc2bab
MD
1379 }
1380
14f28187
FD
1381 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1382
1383 goto end;
1384error:
1385 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1386 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1387end:
41a2b7ae 1388 return ret;
7cdc2bab
MD
1389}
1390
1391static
14f28187
FD
1392bt_query_status lttng_live_query_list_sessions(const bt_value *params,
1393 const bt_value **result)
7cdc2bab 1394{
14f28187
FD
1395 bt_query_status status = BT_QUERY_STATUS_OK;
1396 const bt_value *url_value = NULL;
7cdc2bab 1397 const char *url;
14f28187 1398 struct live_viewer_connection *viewer_connection = NULL;
7cdc2bab 1399
14f28187
FD
1400 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1401 if (!url_value) {
1402 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1403 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1404 goto error;
1405 }
1406
14f28187
FD
1407 if (!bt_value_is_string(url_value)) {
1408 BT_LOGW("`%s` parameter is required to be a string value",
1409 URL_PARAM);
1410 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1411 goto error;
1412 }
1413
14f28187
FD
1414 url = bt_value_string_get(url_value);
1415
1416 viewer_connection = live_viewer_connection_create(url, true, NULL);
7cdc2bab 1417 if (!viewer_connection) {
7cdc2bab
MD
1418 goto error;
1419 }
1420
14f28187
FD
1421 status = live_viewer_connection_list_sessions(viewer_connection,
1422 result);
1423 if (status != BT_QUERY_STATUS_OK) {
c7eee084
PP
1424 goto error;
1425 }
1426
7cdc2bab 1427 goto end;
c7eee084 1428
7cdc2bab 1429error:
14f28187 1430 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1431
14f28187
FD
1432 if (status >= 0) {
1433 status = BT_QUERY_STATUS_ERROR;
c7eee084
PP
1434 }
1435
7cdc2bab
MD
1436end:
1437 if (viewer_connection) {
14f28187 1438 live_viewer_connection_destroy(viewer_connection);
7cdc2bab 1439 }
14f28187 1440 return status;
7cdc2bab
MD
1441}
1442
1443BT_HIDDEN
14f28187 1444bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
b19ff26f 1445 const bt_query_executor *query_exec,
14f28187
FD
1446 const char *object, const bt_value *params,
1447 const bt_value **result)
7cdc2bab 1448{
14f28187 1449 bt_query_status status = BT_QUERY_STATUS_OK;
c7eee084 1450
7cdc2bab 1451 if (strcmp(object, "sessions") == 0) {
14f28187
FD
1452 status = lttng_live_query_list_sessions(params, result);
1453 } else {
1454 BT_LOGW("Unknown query object `%s`", object);
1455 status = BT_QUERY_STATUS_INVALID_OBJECT;
1456 goto end;
7cdc2bab 1457 }
14f28187
FD
1458
1459end:
1460 return status;
7cdc2bab
MD
1461}
1462
1463static
1464void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1465{
19bdff20
FD
1466 if (!lttng_live) {
1467 return;
1468 }
14f28187
FD
1469 if (lttng_live->params.url) {
1470 g_string_free(lttng_live->params.url, TRUE);
7cdc2bab
MD
1471 }
1472 g_free(lttng_live);
1473}
1474
1475BT_HIDDEN
14f28187 1476void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1477{
14f28187
FD
1478 void *data = bt_self_component_get_data(
1479 bt_self_component_source_as_self_component(component));
7cdc2bab
MD
1480
1481 if (!data) {
1482 return;
1483 }
1484 lttng_live_component_destroy_data(data);
1485}
1486
1487static
14f28187
FD
1488enum session_not_found_action parse_session_not_found_action_param(
1489 const bt_value *no_session_param)
1490{
1491 enum session_not_found_action action;
1492 const char *no_session_act_str;
1493 no_session_act_str = bt_value_string_get(no_session_param);
1494 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1495 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1496 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1497 action = SESSION_NOT_FOUND_ACTION_FAIL;
1498 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1499 action = SESSION_NOT_FOUND_ACTION_END;
1500 } else {
1501 action = -1;
1502 }
1503
1504 return action;
1505}
1506
1507struct lttng_live_component *lttng_live_component_create(const bt_value *params)
7cdc2bab
MD
1508{
1509 struct lttng_live_component *lttng_live;
14f28187 1510 const bt_value *value = NULL;
7cdc2bab 1511 const char *url;
7cdc2bab
MD
1512
1513 lttng_live = g_new0(struct lttng_live_component, 1);
1514 if (!lttng_live) {
1515 goto end;
1516 }
7cdc2bab 1517 lttng_live->max_query_size = MAX_QUERY_SIZE;
14f28187
FD
1518 lttng_live->has_msg_iter = false;
1519
1520 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1521 if (!value || !bt_value_is_string(value)) {
1522 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1523 URL_PARAM);
7cdc2bab
MD
1524 goto error;
1525 }
601b0d3c 1526 url = bt_value_string_get(value);
14f28187
FD
1527 lttng_live->params.url = g_string_new(url);
1528 if (!lttng_live->params.url) {
7cdc2bab
MD
1529 goto error;
1530 }
14f28187
FD
1531
1532 value = bt_value_map_borrow_entry_value_const(params,
1533 SESS_NOT_FOUND_ACTION_PARAM);
1534
1535 if (value && bt_value_is_string(value)) {
1536 lttng_live->params.sess_not_found_act =
1537 parse_session_not_found_action_param(value);
1538 if (lttng_live->params.sess_not_found_act == -1) {
1539 BT_LOGE("Unexpected value for `%s` parameter: "
1540 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1541 bt_value_string_get(value));
1542 goto error;
1543 }
1544 } else {
1545 BT_LOGW("Optional `%s` parameter is missing or "
1546 "not a string value. Defaulting to %s=\"%s\".",
1547 SESS_NOT_FOUND_ACTION_PARAM,
1548 SESS_NOT_FOUND_ACTION_PARAM,
1549 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1550 lttng_live->params.sess_not_found_act =
1551 SESSION_NOT_FOUND_ACTION_CONTINUE;
7cdc2bab 1552 }
4c66436f 1553
7cdc2bab
MD
1554 goto end;
1555
1556error:
1557 lttng_live_component_destroy_data(lttng_live);
1558 lttng_live = NULL;
1559end:
1560 return lttng_live;
d3e4dcd8
PP
1561}
1562
f3bc2010 1563BT_HIDDEN
14f28187
FD
1564bt_self_component_status lttng_live_component_init(
1565 bt_self_component_source *self_comp,
c88dd1cb 1566 const bt_value *params, __attribute__((unused)) void *init_method_data)
f3bc2010 1567{
7cdc2bab 1568 struct lttng_live_component *lttng_live;
14f28187 1569 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
7cdc2bab 1570
14f28187 1571 lttng_live = lttng_live_component_create(params);
7cdc2bab 1572 if (!lttng_live) {
14f28187 1573 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
19bdff20 1574 goto error;
7cdc2bab 1575 }
14f28187 1576 lttng_live->self_comp = self_comp;
7cdc2bab 1577
42521b69 1578 if (lttng_live_graph_is_canceled(lttng_live)) {
19bdff20
FD
1579 ret = BT_SELF_COMPONENT_STATUS_END;
1580 goto error;
4bf0e537 1581 }
14f28187 1582
d94d92ac 1583 ret = bt_self_component_source_add_output_port(
14f28187
FD
1584 lttng_live->self_comp, "out",
1585 NULL, NULL);
1586 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
19bdff20 1587 goto error;
147337a3 1588 }
7cdc2bab 1589
14f28187
FD
1590 bt_self_component_set_data(
1591 bt_self_component_source_as_self_component(self_comp),
1592 lttng_live);
19bdff20 1593 goto end;
7cdc2bab 1594
19bdff20
FD
1595error:
1596 lttng_live_component_destroy_data(lttng_live);
1597 lttng_live = NULL;
7cdc2bab
MD
1598end:
1599 return ret;
d85ef162 1600}
This page took 0.127845 seconds and 4 git commands to generate.