lib: rename include dir to babeltrace2
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
14f28187 6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
020bc26f
PP
31#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
32#include "logging.h"
33
14f28187
FD
34#include <glib.h>
35#include <inttypes.h>
36#include <unistd.h>
37
3fadfbc0
MJ
38#include <babeltrace2/assert-internal.h>
39#include <babeltrace2/babeltrace.h>
40#include <babeltrace2/compiler-internal.h>
41#include <babeltrace2/types.h>
7d61fa8e 42#include <plugins-common.h>
f3bc2010 43
7cdc2bab
MD
44#include "data-stream.h"
45#include "metadata.h"
14f28187 46#include "lttng-live.h"
7cdc2bab 47
14f28187
FD
48#define MAX_QUERY_SIZE (256*1024)
49#define URL_PARAM "url"
50#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
51#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
52#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
53#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 54
087bc060 55#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab 56
14f28187
FD
57static
58const char *print_live_iterator_status(enum lttng_live_iterator_status status)
59{
60 switch (status) {
61 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
62 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
63 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
64 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
65 case LTTNG_LIVE_ITERATOR_STATUS_END:
66 return "LTTNG_LIVE_ITERATOR_STATUS_END";
67 case LTTNG_LIVE_ITERATOR_STATUS_OK:
68 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
69 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
70 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
71 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
72 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
73 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
74 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
75 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
76 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
77 default:
78 abort();
79 }
80}
81
82static
83const char *print_state(struct lttng_live_stream_iterator *s)
7cdc2bab
MD
84{
85 switch (s->state) {
86 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
87 return "ACTIVE_NO_DATA";
88 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
89 return "QUIESCENT_NO_DATA";
90 case LTTNG_LIVE_STREAM_QUIESCENT:
91 return "QUIESCENT";
92 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
93 return "ACTIVE_DATA";
94 case LTTNG_LIVE_STREAM_EOF:
95 return "EOF";
96 default:
97 return "ERROR";
98 }
99}
7cdc2bab 100
14f28187
FD
101#define print_stream_state(live_stream_iter) \
102 do { \
103 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
104 ", curr_inact_ts %" PRId64, \
105 print_state(live_stream_iter), \
106 live_stream_iter->last_inactivity_ts, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
6f79a7cf
MD
109
110BT_HIDDEN
42521b69 111bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
6f79a7cf 112{
14f28187
FD
113 const bt_component *component;
114 bool ret;
6f79a7cf
MD
115
116 if (!lttng_live) {
14f28187
FD
117 ret = false;
118 goto end;
7cdc2bab 119 }
7cdc2bab 120
14f28187
FD
121 component = bt_component_source_as_component_const(
122 bt_self_component_source_as_component_source(
123 lttng_live->self_comp));
6f79a7cf 124
14f28187 125 ret = bt_component_graph_is_canceled(component);
4bf0e537 126
14f28187
FD
127end:
128 return ret;
7cdc2bab
MD
129}
130
131static
132struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
133 uint64_t trace_id)
d3e4dcd8 134{
14f28187
FD
135 uint64_t trace_idx;
136 struct lttng_live_trace *ret_trace = NULL;
7cdc2bab 137
14f28187
FD
138 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
139 struct lttng_live_trace *trace =
140 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 141 if (trace->id == trace_id) {
14f28187
FD
142 ret_trace = trace;
143 goto end;
7cdc2bab
MD
144 }
145 }
14f28187
FD
146
147end:
148 return ret_trace;
d3eb6e8f
PP
149}
150
7cdc2bab 151static
14f28187 152void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 153{
14f28187 154 BT_LOGD("Destroy lttng_live_trace");
7cdc2bab 155
14f28187
FD
156 BT_ASSERT(trace->stream_iterators);
157 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 158
14f28187
FD
159 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
160 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 161
7cdc2bab 162 lttng_live_metadata_fini(trace);
7cdc2bab
MD
163 g_free(trace);
164}
165
166static
167struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
168 uint64_t trace_id)
169{
170 struct lttng_live_trace *trace = NULL;
171
172 trace = g_new0(struct lttng_live_trace, 1);
173 if (!trace) {
174 goto error;
175 }
176 trace->session = session;
177 trace->id = trace_id;
14f28187
FD
178 trace->trace_class = NULL;
179 trace->trace = NULL;
180 trace->stream_iterators = g_ptr_array_new_with_free_func(
181 (GDestroyNotify) lttng_live_stream_iterator_destroy);
182 BT_ASSERT(trace->stream_iterators);
7cdc2bab 183 trace->new_metadata_needed = true;
14f28187
FD
184 g_ptr_array_add(session->traces, trace);
185
087bc060 186 BT_LOGI("Create trace");
7cdc2bab
MD
187 goto end;
188error:
189 g_free(trace);
190 trace = NULL;
191end:
192 return trace;
193}
194
195BT_HIDDEN
14f28187
FD
196struct lttng_live_trace *lttng_live_borrow_trace(
197 struct lttng_live_session *session, uint64_t trace_id)
7cdc2bab
MD
198{
199 struct lttng_live_trace *trace;
200
201 trace = lttng_live_find_trace(session, trace_id);
202 if (trace) {
14f28187 203 goto end;
7cdc2bab 204 }
7cdc2bab 205
14f28187
FD
206 /* The session is the owner of the newly created trace. */
207 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 208
14f28187
FD
209end:
210 return trace;
7cdc2bab
MD
211}
212
213BT_HIDDEN
14f28187 214int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
06994c71
MD
215 uint64_t session_id, const char *hostname,
216 const char *session_name)
7cdc2bab
MD
217{
218 int ret = 0;
14f28187 219 struct lttng_live_session *session;
7cdc2bab 220
14f28187
FD
221 session = g_new0(struct lttng_live_session, 1);
222 if (!session) {
7cdc2bab
MD
223 goto error;
224 }
225
14f28187
FD
226 session->id = session_id;
227 session->traces = g_ptr_array_new_with_free_func(
228 (GDestroyNotify) lttng_live_destroy_trace);
229 BT_ASSERT(session->traces);
230 session->lttng_live_msg_iter = lttng_live_msg_iter;
231 session->new_streams_needed = true;
232 session->hostname = g_string_new(hostname);
233 BT_ASSERT(session->hostname);
234
235 session->session_name = g_string_new(session_name);
236 BT_ASSERT(session->session_name);
7cdc2bab 237
06994c71 238 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
14f28187
FD
239 session->id, hostname, session_name);
240 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
7cdc2bab
MD
241 goto end;
242error:
087bc060 243 BT_LOGE("Error adding session");
14f28187 244 g_free(session);
7cdc2bab
MD
245 ret = -1;
246end:
247 return ret;
248}
249
250static
251void lttng_live_destroy_session(struct lttng_live_session *session)
252{
14f28187
FD
253 struct lttng_live_component *live_comp;
254
255 if (!session) {
256 goto end;
257 }
7cdc2bab 258
14f28187 259 BT_LOGD("Destroy lttng live session");
7cdc2bab
MD
260 if (session->id != -1ULL) {
261 if (lttng_live_detach_session(session)) {
14f28187
FD
262 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
263 if (session->lttng_live_msg_iter &&
42521b69 264 !lttng_live_graph_is_canceled(live_comp)) {
4c66436f 265 /* Old relayd cannot detach sessions. */
14f28187 266 BT_LOGD("Unable to detach lttng live session %" PRIu64,
4c66436f
MD
267 session->id);
268 }
7cdc2bab
MD
269 }
270 session->id = -1ULL;
271 }
14f28187
FD
272
273 if (session->traces) {
274 g_ptr_array_free(session->traces, TRUE);
7cdc2bab 275 }
14f28187 276
06994c71
MD
277 if (session->hostname) {
278 g_string_free(session->hostname, TRUE);
279 }
280 if (session->session_name) {
281 g_string_free(session->session_name, TRUE);
282 }
7cdc2bab 283 g_free(session);
14f28187
FD
284
285end:
286 return;
7cdc2bab
MD
287}
288
14f28187
FD
289static
290void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 291{
14f28187
FD
292 if (!lttng_live_msg_iter) {
293 goto end;
7cdc2bab 294 }
7cdc2bab 295
14f28187
FD
296 if (lttng_live_msg_iter->sessions) {
297 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
7cdc2bab 298 }
14f28187
FD
299
300 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
301 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
302 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
303
304 /* All stream iterators must be destroyed at this point. */
305 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
306 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
307
308 g_free(lttng_live_msg_iter);
309
310end:
311 return;
312}
313
314BT_HIDDEN
315void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
316{
317 struct lttng_live_msg_iter *lttng_live_msg_iter;
318
319 BT_ASSERT(self_msg_iter);
320
321 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
322 BT_ASSERT(lttng_live_msg_iter);
323 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
324}
325
326static
14f28187 327enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
7cdc2bab
MD
328 struct lttng_live_stream_iterator *lttng_live_stream)
329{
330 switch (lttng_live_stream->state) {
331 case LTTNG_LIVE_STREAM_QUIESCENT:
332 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
333 break;
334 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
335 /* Invalid state. */
087bc060
MD
336 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
337 abort();
7cdc2bab
MD
338 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
339 /* Invalid state. */
087bc060
MD
340 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
341 abort();
7cdc2bab
MD
342 case LTTNG_LIVE_STREAM_EOF:
343 break;
344 }
14f28187 345 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
346}
347
348/*
349 * For active no data stream, fetch next data. It can be either:
350 * - quiescent: need to put it in the prio heap at quiescent end
351 * timestamp,
352 * - have data: need to wire up first event into the prio heap,
353 * - have no data on this stream at this point: need to retry (AGAIN) or
354 * return EOF.
355 */
356static
14f28187
FD
357enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
358 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
359 struct lttng_live_stream_iterator *lttng_live_stream)
360{
14f28187
FD
361 enum lttng_live_iterator_status ret =
362 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
363 struct packet_index index;
364 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
365
366 if (lttng_live_stream->trace->new_metadata_needed) {
14f28187 367 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
368 goto end;
369 }
370 if (lttng_live_stream->trace->session->new_streams_needed) {
14f28187 371 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
372 goto end;
373 }
14f28187
FD
374 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
375 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
7cdc2bab
MD
376 goto end;
377 }
14f28187
FD
378 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
379 &index);
380 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
381 goto end;
382 }
f6ccaed9 383 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
7cdc2bab 384 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187
FD
385 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
386 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
387
388 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
389 last_inact_ts == curr_inact_ts) {
390 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
391 print_stream_state(lttng_live_stream);
392 } else {
14f28187 393 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
394 }
395 goto end;
396 }
397 lttng_live_stream->base_offset = index.offset;
398 lttng_live_stream->offset = index.offset;
399 lttng_live_stream->len = index.packet_size / CHAR_BIT;
400end:
14f28187
FD
401 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
402 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
7cdc2bab
MD
403 }
404 return ret;
405}
406
407/*
14f28187
FD
408 * Creation of the message requires the ctf trace class to be created
409 * beforehand, but the live protocol gives us all streams (including metadata)
410 * at once. So we split it in three steps: getting streams, getting metadata
411 * (which creates the ctf trace class), and then creating the per-stream
412 * messages.
7cdc2bab
MD
413 */
414static
14f28187
FD
415enum lttng_live_iterator_status lttng_live_get_session(
416 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
417 struct lttng_live_session *session)
418{
14f28187
FD
419 enum lttng_live_iterator_status status;
420 uint64_t trace_idx;
421 int ret = 0;
7cdc2bab 422
14f28187
FD
423 if (!session->attached) {
424 ret = lttng_live_attach_session(session);
425 if (ret) {
42521b69 426 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
14f28187
FD
427 lttng_live_msg_iter->lttng_live_comp)) {
428 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
429 } else {
430 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
431 }
432 goto end;
4c66436f 433 }
7cdc2bab 434 }
14f28187 435
7cdc2bab 436 status = lttng_live_get_new_streams(session);
14f28187
FD
437 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
438 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
439 goto end;
7cdc2bab 440 }
14f28187
FD
441 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
442 struct lttng_live_trace *trace =
443 g_ptr_array_index(session->traces, trace_idx);
444
7cdc2bab 445 status = lttng_live_metadata_update(trace);
14f28187
FD
446 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
447 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
448 goto end;
7cdc2bab
MD
449 }
450 }
14f28187
FD
451 status = lttng_live_lazy_msg_init(session);
452
453end:
454 return status;
7cdc2bab
MD
455}
456
457BT_HIDDEN
14f28187 458void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 459{
14f28187 460 uint64_t session_idx;
7cdc2bab 461
14f28187
FD
462 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
463 session_idx++) {
464 struct lttng_live_session *session =
465 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
466 session->new_streams_needed = true;
467 }
468}
469
470static
14f28187 471void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 472{
14f28187 473 uint64_t session_idx, trace_idx;
7cdc2bab 474
14f28187
FD
475 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
476 session_idx++) {
477 struct lttng_live_session *session =
478 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab 479 session->new_streams_needed = true;
14f28187
FD
480 for (trace_idx = 0; trace_idx < session->traces->len;
481 trace_idx++) {
482 struct lttng_live_trace *trace =
483 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab
MD
484 trace->new_metadata_needed = true;
485 }
486 }
487}
488
489static
14f28187
FD
490enum lttng_live_iterator_status
491lttng_live_iterator_handle_new_streams_and_metadata(
492 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 493{
14f28187
FD
494 enum lttng_live_iterator_status ret =
495 LTTNG_LIVE_ITERATOR_STATUS_OK;
496 uint64_t session_idx = 0, nr_sessions_opened = 0;
497 struct lttng_live_session *session;
498 enum session_not_found_action sess_not_found_act =
499 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
500
7cdc2bab 501 /*
14f28187 502 * In a remotely distant future, we could add a "new
7cdc2bab
MD
503 * session" flag to the protocol, which would tell us that we
504 * need to query for new sessions even though we have sessions
505 * currently ongoing.
506 */
14f28187
FD
507 if (lttng_live_msg_iter->sessions->len == 0) {
508 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
509 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
510 goto end;
511 } else {
512 /*
513 * Retry to create a viewer session for the requested
514 * session name.
515 */
516 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
517 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
518 goto end;
519 }
520 }
7cdc2bab 521 }
14f28187
FD
522
523 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
524 session_idx++) {
525 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
526 session_idx);
527 ret = lttng_live_get_session(lttng_live_msg_iter, session);
7cdc2bab 528 switch (ret) {
14f28187 529 case LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab 530 break;
14f28187
FD
531 case LTTNG_LIVE_ITERATOR_STATUS_END:
532 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
533 break;
534 default:
535 goto end;
536 }
537 if (!session->closed) {
538 nr_sessions_opened++;
539 }
540 }
541end:
14f28187
FD
542 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
543 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
544 nr_sessions_opened == 0) {
545 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
546 }
547 return ret;
548}
549
550static
14f28187
FD
551enum lttng_live_iterator_status emit_inactivity_message(
552 struct lttng_live_msg_iter *lttng_live_msg_iter,
553 struct lttng_live_stream_iterator *stream_iter,
554 bt_message **message, uint64_t timestamp)
7cdc2bab 555{
14f28187
FD
556 enum lttng_live_iterator_status ret =
557 LTTNG_LIVE_ITERATOR_STATUS_OK;
558 bt_message *msg = NULL;
7cdc2bab 559
14f28187
FD
560 BT_ASSERT(stream_iter->trace->clock_class);
561
562 msg = bt_message_message_iterator_inactivity_create(
563 lttng_live_msg_iter->self_msg_iter,
564 stream_iter->trace->clock_class,
565 timestamp);
d6e69534 566 if (!msg) {
7cdc2bab
MD
567 goto error;
568 }
14f28187 569
d6e69534 570 *message = msg;
7cdc2bab 571end:
7cdc2bab
MD
572 return ret;
573
574error:
14f28187 575 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
d6e69534 576 bt_message_put_ref(msg);
7cdc2bab
MD
577 goto end;
578}
579
580static
14f28187
FD
581enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
582 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 583 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 584 bt_message **message)
7cdc2bab 585{
14f28187
FD
586 enum lttng_live_iterator_status ret =
587 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
588
589 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187 590 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
591 }
592
14f28187
FD
593 if (lttng_live_stream->current_inactivity_ts ==
594 lttng_live_stream->last_inactivity_ts) {
7cdc2bab 595 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
14f28187
FD
596 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
597 goto end;
598 }
599
600 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
601 message, lttng_live_stream->current_inactivity_ts);
602
603 lttng_live_stream->last_inactivity_ts =
604 lttng_live_stream->current_inactivity_ts;
605end:
606 return ret;
607}
608
609static
610int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
611 struct lttng_live_msg_iter *lttng_live_msg_iter,
612 const bt_message *msg, int64_t last_msg_ts_ns,
613 int64_t *ts_ns)
614{
615 const bt_clock_class *clock_class = NULL;
616 const bt_clock_snapshot *clock_snapshot = NULL;
617 int ret = 0;
14f28187
FD
618 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
619
620 BT_ASSERT(msg);
621 BT_ASSERT(ts_ns);
622
623 BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
624 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
625 last_msg_ts_ns);
626
627 switch (bt_message_get_type(msg)) {
628 case BT_MESSAGE_TYPE_EVENT:
629 clock_class =
630 bt_message_event_borrow_stream_class_default_clock_class_const(
631 msg);
632 BT_ASSERT(clock_class);
633
0cbc2c33
PP
634 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
635 msg);
14f28187
FD
636 break;
637 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
638 clock_class =
639 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
640 msg);
641 BT_ASSERT(clock_class);
642
0cbc2c33
PP
643 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
644 msg);
14f28187
FD
645 break;
646 case BT_MESSAGE_TYPE_PACKET_END:
647 clock_class =
648 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
649 msg);
650 BT_ASSERT(clock_class);
651
0cbc2c33
PP
652 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
653 msg);
14f28187
FD
654 break;
655 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
656 clock_class =
657 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
658 msg);
659 BT_ASSERT(clock_class);
660
9b24b6aa 661 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 662 msg);
14f28187
FD
663 break;
664 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
665 clock_class =
666 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
667 msg);
668 BT_ASSERT(clock_class);
669
9b24b6aa 670 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 671 msg);
14f28187
FD
672 break;
673 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
674 clock_class =
675 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
676 msg);
677 BT_ASSERT(clock_class);
678
679 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
680 msg, &clock_snapshot);
681 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
682 goto no_clock_snapshot;
683 }
684
685 break;
686 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
687 clock_class =
688 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
689 msg);
690 BT_ASSERT(clock_class);
691
692 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
693 msg, &clock_snapshot);
694 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
695 goto no_clock_snapshot;
696 }
697
698 break;
699 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
0cbc2c33 700 clock_snapshot =
14f28187 701 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
0cbc2c33 702 msg);
14f28187
FD
703 break;
704 default:
705 /* All the other messages have a higher priority */
706 BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
707 *ts_ns = last_msg_ts_ns;
7cdc2bab
MD
708 goto end;
709 }
710
14f28187
FD
711 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
712 BT_ASSERT(clock_class);
713
714 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
715 if (ret) {
716 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
717 "clock-snapshot-addr=%p", clock_snapshot);
718 goto error;
719 }
720
721 goto end;
722
723no_clock_snapshot:
724 BT_LOGV_STR("Message's default clock snapshot is missing: "
725 "using the last message timestamp.");
726 *ts_ns = last_msg_ts_ns;
727 goto end;
728
729error:
730 ret = -1;
7cdc2bab 731
7cdc2bab 732end:
14f28187
FD
733 if (ret == 0) {
734 BT_LOGV("Found message's timestamp: "
735 "iter-data-addr=%p, msg-addr=%p, "
736 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
737 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
738 }
739
7cdc2bab
MD
740 return ret;
741}
742
743static
14f28187
FD
744enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
745 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 746 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 747 bt_message **message)
7cdc2bab 748{
14f28187 749 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
d6e69534 750 enum bt_msg_iter_status status;
14f28187 751 uint64_t session_idx, trace_idx;
7cdc2bab 752
14f28187
FD
753 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
754 session_idx++) {
755 struct lttng_live_session *session =
756 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
757
758 if (session->new_streams_needed) {
14f28187
FD
759 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
760 goto end;
7cdc2bab 761 }
14f28187
FD
762 for (trace_idx = 0; trace_idx < session->traces->len;
763 trace_idx++) {
764 struct lttng_live_trace *trace =
765 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 766 if (trace->new_metadata_needed) {
14f28187
FD
767 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
768 goto end;
7cdc2bab
MD
769 }
770 }
771 }
772
773 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
14f28187
FD
774 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
775 goto end;
7cdc2bab 776 }
14f28187
FD
777
778 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
779 lttng_live_msg_iter->self_msg_iter, message);
7cdc2bab 780 switch (status) {
d6e69534 781 case BT_MSG_ITER_STATUS_EOF:
14f28187 782 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 783 break;
d6e69534 784 case BT_MSG_ITER_STATUS_OK:
14f28187 785 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 786 break;
d6e69534 787 case BT_MSG_ITER_STATUS_AGAIN:
7cdc2bab
MD
788 /*
789 * Continue immediately (end of packet). The next
790 * get_index may return AGAIN to delay the following
791 * attempt.
792 */
14f28187 793 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab 794 break;
d6e69534 795 case BT_MSG_ITER_STATUS_INVAL:
7cdc2bab 796 /* No argument provided by the user, so don't return INVAL. */
d6e69534 797 case BT_MSG_ITER_STATUS_ERROR:
7cdc2bab 798 default:
14f28187
FD
799 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
800 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
801 lttng_live_stream->msg_iter);
7cdc2bab
MD
802 break;
803 }
14f28187
FD
804
805end:
7cdc2bab
MD
806 return ret;
807}
808
809/*
810 * helper function:
811 * handle_no_data_streams()
812 * retry:
813 * - for each ACTIVE_NO_DATA stream:
814 * - query relayd for stream data, or quiescence info.
815 * - if need metadata, get metadata, goto retry.
816 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
817 * - if quiescent, move to QUIESCENT streams
818 * - if fetched data, move to ACTIVE_DATA streams
819 * (at this point each stream either has data, or is quiescent)
820 *
821 *
822 * iterator_next:
823 * handle_new_streams_and_metadata()
824 * - query relayd for known streams, add them as ACTIVE_NO_DATA
825 * - query relayd for metadata
826 *
827 * call handle_active_no_data_streams()
828 *
829 * handle_quiescent_streams()
830 * - if at least one stream is ACTIVE_DATA:
831 * - peek stream event with lowest timestamp -> next_ts
832 * - for each quiescent stream
833 * - if next_ts >= quiescent end
834 * - set state to ACTIVE_NO_DATA
835 * - else
836 * - for each quiescent stream
837 * - set state to ACTIVE_NO_DATA
838 *
839 * call handle_active_no_data_streams()
840 *
841 * handle_active_data_streams()
842 * - if at least one stream is ACTIVE_DATA:
843 * - get stream event with lowest timestamp from heap
d6e69534 844 * - make that stream event the current message.
7cdc2bab
MD
845 * - move this stream heap position to its next event
846 * - if we need to fetch data from relayd, move
847 * stream to ACTIVE_NO_DATA.
848 * - return OK
849 * - return AGAIN
850 *
851 * end criterion: ctrl-c on client. If relayd exits or the session
852 * closes on the relay daemon side, we keep on waiting for streams.
853 * Eventually handle --end timestamp (also an end criterion).
854 *
855 * When disconnected from relayd: try to re-connect endlessly.
856 */
857static
14f28187
FD
858enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
859 struct lttng_live_msg_iter *lttng_live_msg_iter,
860 struct lttng_live_stream_iterator *stream_iter,
861 bt_message **curr_msg)
7cdc2bab 862{
14f28187 863 enum lttng_live_iterator_status live_status;
7cdc2bab 864
7cdc2bab
MD
865retry:
866 print_stream_state(stream_iter);
14f28187
FD
867 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
868 lttng_live_msg_iter);
869 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
870 goto end;
871 }
14f28187
FD
872 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
873 lttng_live_msg_iter, stream_iter);
874 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
875 goto end;
876 }
14f28187
FD
877 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
878 lttng_live_msg_iter, stream_iter, curr_msg);
879 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
880 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
881 goto end;
882 }
14f28187 883 if (*curr_msg) {
7cdc2bab
MD
884 goto end;
885 }
14f28187
FD
886 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
887 lttng_live_msg_iter, stream_iter, curr_msg);
888 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
889 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
890 }
891
892end:
14f28187 893 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
7cdc2bab 894 goto retry;
7cdc2bab 895 }
14f28187
FD
896
897 return live_status;
7cdc2bab
MD
898}
899
900static
14f28187
FD
901enum lttng_live_iterator_status next_stream_iterator_for_trace(
902 struct lttng_live_msg_iter *lttng_live_msg_iter,
903 struct lttng_live_trace *live_trace,
904 struct lttng_live_stream_iterator **candidate_stream_iter)
7cdc2bab 905{
14f28187
FD
906 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
907 enum lttng_live_iterator_status stream_iter_status;;
908 int64_t curr_candidate_msg_ts = INT64_MAX;
909 uint64_t stream_iter_idx;
7cdc2bab 910
14f28187
FD
911 BT_ASSERT(live_trace);
912 BT_ASSERT(live_trace->stream_iterators);
913 /*
914 * Update the current message of every stream iterators of this trace.
915 * The current msg of every stream must have a timestamp equal or
916 * larger than the last message returned by this iterator. We must
917 * ensure monotonicity.
918 */
919 stream_iter_idx = 0;
920 while (stream_iter_idx < live_trace->stream_iterators->len) {
921 bool stream_iter_is_ended = false;
922 struct lttng_live_stream_iterator *stream_iter =
923 g_ptr_array_index(live_trace->stream_iterators,
924 stream_iter_idx);
925
926 /*
927 * Find if there is are now current message for this stream
928 * iterator get it.
929 */
930 while (!stream_iter->current_msg) {
931 bt_message *msg = NULL;
932 int64_t curr_msg_ts_ns = INT64_MAX;
933 stream_iter_status = lttng_live_iterator_next_on_stream(
934 lttng_live_msg_iter, stream_iter, &msg);
935
936 BT_LOGD("live stream iterator returned status :%s",
937 print_live_iterator_status(stream_iter_status));
938 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
939 stream_iter_is_ended = true;
940 break;
941 }
942
943 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
944 goto end;
945 }
946
947 BT_ASSERT(msg);
948
949 /*
950 * Get the timestamp in nanoseconds from origin of this
951 * messsage.
952 */
953 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
954 msg, lttng_live_msg_iter->last_msg_ts_ns,
955 &curr_msg_ts_ns);
956
957 /*
958 * Check if the message of the current live stream
959 * iterator occured at the exact same time or after the
960 * last message returned by this component's message
961 * iterator. If not, we return an error.
962 */
963 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
964 stream_iter->current_msg = msg;
965 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
966 } else {
967 /*
968 * We received a message in the past. To ensure
969 * monotonicity, we can't send it forward.
970 */
971 BT_LOGE("Message's timestamp is less than "
972 "lttng-live's message iterator's last "
973 "returned timestamp: "
974 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
975 "last-msg-ts=%" PRId64,
976 lttng_live_msg_iter, curr_msg_ts_ns,
977 lttng_live_msg_iter->last_msg_ts_ns);
978 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
979 goto end;
980 }
981 }
982
983 if (!stream_iter_is_ended &&
984 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
985 /*
986 * Update the current best candidate message for the
987 * stream iterator of thise live trace to be forwarded
988 * downstream.
989 */
990 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
991 curr_candidate_stream_iter = stream_iter;
992 }
993
994 if (stream_iter_is_ended) {
995 /*
996 * The live stream iterator is ENDed. We remove that
997 * iterator from the list and we restart the iteration
998 * at the beginning of the live stream iterator array
999 * to because the removal will shuffle the array.
1000 */
1001 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1002 stream_iter_idx);
1003 stream_iter_idx = 0;
1004 } else {
1005 stream_iter_idx++;
1006 }
1007 }
1008
1009 if (curr_candidate_stream_iter) {
1010 *candidate_stream_iter = curr_candidate_stream_iter;
1011 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1012 } else {
1013 /*
1014 * The only case where we don't have a candidate for this trace
1015 * is if we reached the end of all the iterators.
1016 */
1017 BT_ASSERT(live_trace->stream_iterators->len == 0);
1018 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1019 }
1020
1021end:
1022 return stream_iter_status;
1023}
1024
1025static
1026enum lttng_live_iterator_status next_stream_iterator_for_session(
1027 struct lttng_live_msg_iter *lttng_live_msg_iter,
1028 struct lttng_live_session *session,
1029 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1030{
1031 enum lttng_live_iterator_status stream_iter_status;
1032 uint64_t trace_idx = 0;
1033 int64_t curr_candidate_msg_ts = INT64_MAX;
1034 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1035
1036 /*
1037 * Make sure we are attached to the session and look for new streams
1038 * and metadata.
1039 */
1040 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1041 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1042 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1043 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
1044 goto end;
1045 }
14f28187
FD
1046
1047 BT_ASSERT(session->traces);
1048
1049 /*
1050 * Use while loops here rather then for loops so we can restart the
1051 * iteration if an element is removed from the array during the
1052 * looping.
1053 */
1054 while (trace_idx < session->traces->len) {
1055 bool trace_is_ended = false;
1056 struct lttng_live_stream_iterator *stream_iter;
1057 struct lttng_live_trace *trace =
1058 g_ptr_array_index(session->traces, trace_idx);
1059
1060 stream_iter_status = next_stream_iterator_for_trace(
1061 lttng_live_msg_iter, trace, &stream_iter);
1062 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1063 /*
1064 * All the live stream iterators for this trace are
1065 * ENDed. Remove the trace from this session.
1066 */
1067 trace_is_ended = true;
1068 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1069 goto end;
1070 }
1071
1072 if (!trace_is_ended) {
1073 BT_ASSERT(stream_iter);
1074
1075 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1076 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1077 curr_candidate_stream_iter = stream_iter;
1078 }
1079 trace_idx++;
1080 } else {
1081 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1082 trace_idx = 0;
1083 }
1084 }
1085 if (curr_candidate_stream_iter) {
1086 *candidate_session_stream_iter = curr_candidate_stream_iter;
1087 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 1088 } else {
14f28187
FD
1089 /*
1090 * The only cases where we don't have a candidate for this
1091 * trace is:
1092 * 1. if we reached the end of all the iterators of all the
1093 * traces of this session,
1094 * 2. if we never had live stream iterator in the first place.
1095 *
1096 * In either cases, we return END.
1097 */
1098 BT_ASSERT(session->traces->len == 0);
1099 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1100 }
1101end:
14f28187
FD
1102 return stream_iter_status;
1103}
1104
1105static inline
1106void put_messages(bt_message_array_const msgs, uint64_t count)
1107{
1108 uint64_t i;
1109
1110 for (i = 0; i < count; i++) {
1111 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
7cdc2bab 1112 }
7cdc2bab
MD
1113}
1114
d3eb6e8f 1115BT_HIDDEN
14f28187
FD
1116bt_self_message_iterator_status lttng_live_msg_iter_next(
1117 bt_self_message_iterator *self_msg_it,
1118 bt_message_array_const msgs, uint64_t capacity,
1119 uint64_t *count)
d3eb6e8f 1120{
14f28187
FD
1121 bt_self_message_iterator_status status;
1122 struct lttng_live_msg_iter *lttng_live_msg_iter =
1123 bt_self_message_iterator_get_data(self_msg_it);
1124 struct lttng_live_component *lttng_live =
1125 lttng_live_msg_iter->lttng_live_comp;
1126 enum lttng_live_iterator_status stream_iter_status;
1127 uint64_t session_idx;
1128
1129 *count = 0;
1130
1131 BT_ASSERT(lttng_live_msg_iter);
1132
1133 /*
1134 * Clear all the invalid message reference that might be left over in
1135 * the output array.
1136 */
1137 memset(msgs, 0, capacity * sizeof(*msgs));
1138
1139 /*
1140 * If no session are exposed on the relay found at the url provided by
1141 * the user, session count will be 0. In this case, we return status
1142 * end to return gracefully.
1143 */
1144 if (lttng_live_msg_iter->sessions->len == 0) {
1145 if (lttng_live->params.sess_not_found_act !=
1146 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1147 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1148 goto no_session;
1149 } else {
1150 /*
1151 * The are no more active session for this session
1152 * name. Retry to create a viewer session for the
1153 * requested session name.
1154 */
1155 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1156 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1157 goto no_session;
1158 }
1159 }
1160 }
1161
1162 if (lttng_live_msg_iter->active_stream_iter == 0) {
1163 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1164 }
1165
1166 /*
1167 * Here the muxing of message is done.
1168 *
1169 * We need to iterate over all the streams of all the traces of all the
1170 * viewer sessions in order to get the message with the smallest
1171 * timestamp. In this case, a session is a viewer session and there is
1172 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1173 * kernel). Each viewer session can have multiple traces, for example,
1174 * 64bit UST viewer sessions could have multiple per-pid traces.
1175 *
1176 * We iterate over the streams of each traces to update and see what is
1177 * their next message's timestamp. From those timestamps, we select the
1178 * message with the smallest timestamp as the best candidate message
1179 * for that trace and do the same thing across all the sessions.
1180 *
1181 * We then compare the timestamp of best candidate message of all the
1182 * sessions to pick the message with the smallest timestamp and we
1183 * return it.
1184 */
1185 while (*count < capacity) {
1186 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1187 *candidate_stream_iter = NULL;
1188 int64_t next_msg_ts_ns = INT64_MAX;
1189
1190 BT_ASSERT(lttng_live_msg_iter->sessions);
1191 session_idx = 0;
1192 /*
1193 * Use a while loop instead of a for loop so we can restart the
1194 * iteration if we remove an element. We can safely call
1195 * next_stream_iterator_for_session() multiple times on the
1196 * same session as we only fetch a new message if there is no
1197 * current next message for each live stream iterator.
1198 * If all live stream iterator of that session already have a
1199 * current next message, the function will simply exit return
1200 * the same candidate live stream iterator every time.
1201 */
1202 while (session_idx < lttng_live_msg_iter->sessions->len) {
1203 struct lttng_live_session *session =
1204 g_ptr_array_index(lttng_live_msg_iter->sessions,
1205 session_idx);
1206
1207 /* Find the best candidate message to send downstream. */
1208 stream_iter_status = next_stream_iterator_for_session(
1209 lttng_live_msg_iter, session,
1210 &candidate_stream_iter);
1211
1212 /* If we receive an END status, it means that either:
1213 * - Those traces never had active streams (UST with no
1214 * data produced yet),
1215 * - All live stream iterators have ENDed.*/
1216 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1217 if (session->closed && session->traces->len == 0) {
1218 /*
1219 * Remove the session from the list and restart the
1220 * iteration at the beginning of the array since the
1221 * removal shuffle the elements of the array.
1222 */
1223 g_ptr_array_remove_index_fast(
1224 lttng_live_msg_iter->sessions,
1225 session_idx);
1226 session_idx = 0;
1227 } else {
1228 session_idx++;
1229 }
1230 continue;
1231 }
1232
1233 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1234 goto end;
1235 }
1236
1237 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1238 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1239 next_stream_iter = candidate_stream_iter;
1240 }
1241
1242 session_idx++;
1243 }
1244
1245 if (!next_stream_iter) {
1246 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1247 goto end;
1248 }
1249
1250 BT_ASSERT(next_stream_iter->current_msg);
1251 /* Ensure monotonicity. */
1252 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1253 next_stream_iter->current_msg_ts_ns);
1254
1255 /*
1256 * Insert the next message to the message batch. This will set
1257 * stream iterator current messsage to NULL so that next time
1258 * we fetch the next message of that stream iterator
1259 */
1260 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1261 (*count)++;
1262
1263 /* Update the last timestamp in nanoseconds sent downstream. */
1264 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1265 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1266
1267 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1268 }
1269end:
1270 switch (stream_iter_status) {
1271 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1272 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1273 if (*count > 0) {
1274 /*
1275 * We received a again status but we have some messages
1276 * to send downstream. We send them and return OK for
1277 * now. On the next call we return again if there are
1278 * still no new message to send.
1279 */
1280 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1281 } else {
1282 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1283 }
7cdc2bab 1284 break;
14f28187
FD
1285 case LTTNG_LIVE_ITERATOR_STATUS_END:
1286 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
7cdc2bab 1287 break;
14f28187
FD
1288 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1289 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1290 break;
1291 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1292 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1293 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1294 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1295 /* Put all existing messages on error. */
1296 put_messages(msgs, *count);
7cdc2bab 1297 break;
14f28187
FD
1298 default:
1299 abort();
7cdc2bab 1300 }
14f28187
FD
1301
1302no_session:
1303 return status;
7cdc2bab 1304}
41a2b7ae 1305
7cdc2bab 1306BT_HIDDEN
14f28187
FD
1307bt_self_message_iterator_status lttng_live_msg_iter_init(
1308 bt_self_message_iterator *self_msg_it,
1309 bt_self_component_source *self_comp_src,
1310 bt_self_component_port_output *self_port)
7cdc2bab 1311{
14f28187
FD
1312 bt_self_message_iterator_status ret =
1313 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1314 bt_self_component *self_comp =
1315 bt_self_component_source_as_self_component(self_comp_src);
1316 struct lttng_live_component *lttng_live;
1317 struct lttng_live_msg_iter *lttng_live_msg_iter;
1318
1319 BT_ASSERT(self_msg_it);
1320
1321 lttng_live = bt_self_component_get_data(self_comp);
1322
1323 /* There can be only one downstream iterator at the same time. */
1324 BT_ASSERT(!lttng_live->has_msg_iter);
1325 lttng_live->has_msg_iter = true;
1326
1327 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1328 if (!lttng_live_msg_iter) {
1329 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1330 goto end;
7cdc2bab 1331 }
14f28187
FD
1332
1333 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1334 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1335
1336 lttng_live_msg_iter->active_stream_iter = 0;
1337 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1338 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1339 (GDestroyNotify) lttng_live_destroy_session);
1340 BT_ASSERT(lttng_live_msg_iter->sessions);
1341
1342 lttng_live_msg_iter->viewer_connection =
1343 live_viewer_connection_create(lttng_live->params.url->str, false,
1344 lttng_live_msg_iter);
1345 if (!lttng_live_msg_iter->viewer_connection) {
1346 goto error;
1347 }
1348
1349 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1350 goto error;
1351 }
1352 if (lttng_live_msg_iter->sessions->len == 0) {
1353 switch (lttng_live->params.sess_not_found_act) {
1354 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1355 BT_LOGI("Unable to connect to the requested live viewer "
1356 "session. Keep trying to connect because of "
1357 "%s=\"%s\" component parameter: url=\"%s\"",
1358 SESS_NOT_FOUND_ACTION_PARAM,
1359 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1360 lttng_live->params.url->str);
1361 break;
1362 case SESSION_NOT_FOUND_ACTION_FAIL:
1363 BT_LOGE("Unable to connect to the requested live viewer "
1364 "session. Fail the message iterator"
1365 "initialization because of %s=\"%s\" "
1366 "component parameter: url =\"%s\"",
1367 SESS_NOT_FOUND_ACTION_PARAM,
1368 SESS_NOT_FOUND_ACTION_FAIL_STR,
1369 lttng_live->params.url->str);
7cdc2bab 1370 goto error;
14f28187
FD
1371 case SESSION_NOT_FOUND_ACTION_END:
1372 BT_LOGI("Unable to connect to the requested live viewer "
1373 "session. End gracefully at the first _next() "
1374 "call because of %s=\"%s\" component parameter: "
1375 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1376 SESS_NOT_FOUND_ACTION_END_STR,
1377 lttng_live->params.url->str);
1378 break;
7cdc2bab 1379 }
7cdc2bab
MD
1380 }
1381
14f28187
FD
1382 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1383
1384 goto end;
1385error:
1386 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1387 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1388end:
41a2b7ae 1389 return ret;
7cdc2bab
MD
1390}
1391
1392static
14f28187
FD
1393bt_query_status lttng_live_query_list_sessions(const bt_value *params,
1394 const bt_value **result)
7cdc2bab 1395{
14f28187
FD
1396 bt_query_status status = BT_QUERY_STATUS_OK;
1397 const bt_value *url_value = NULL;
7cdc2bab 1398 const char *url;
14f28187 1399 struct live_viewer_connection *viewer_connection = NULL;
7cdc2bab 1400
14f28187
FD
1401 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1402 if (!url_value) {
1403 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1404 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1405 goto error;
1406 }
1407
14f28187
FD
1408 if (!bt_value_is_string(url_value)) {
1409 BT_LOGW("`%s` parameter is required to be a string value",
1410 URL_PARAM);
1411 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1412 goto error;
1413 }
1414
14f28187
FD
1415 url = bt_value_string_get(url_value);
1416
1417 viewer_connection = live_viewer_connection_create(url, true, NULL);
7cdc2bab 1418 if (!viewer_connection) {
7cdc2bab
MD
1419 goto error;
1420 }
1421
14f28187
FD
1422 status = live_viewer_connection_list_sessions(viewer_connection,
1423 result);
1424 if (status != BT_QUERY_STATUS_OK) {
c7eee084
PP
1425 goto error;
1426 }
1427
7cdc2bab 1428 goto end;
c7eee084 1429
7cdc2bab 1430error:
14f28187 1431 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1432
14f28187
FD
1433 if (status >= 0) {
1434 status = BT_QUERY_STATUS_ERROR;
c7eee084
PP
1435 }
1436
7cdc2bab
MD
1437end:
1438 if (viewer_connection) {
14f28187 1439 live_viewer_connection_destroy(viewer_connection);
7cdc2bab 1440 }
14f28187 1441 return status;
7cdc2bab
MD
1442}
1443
1444BT_HIDDEN
14f28187 1445bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
b19ff26f 1446 const bt_query_executor *query_exec,
14f28187
FD
1447 const char *object, const bt_value *params,
1448 const bt_value **result)
7cdc2bab 1449{
14f28187 1450 bt_query_status status = BT_QUERY_STATUS_OK;
c7eee084 1451
7cdc2bab 1452 if (strcmp(object, "sessions") == 0) {
14f28187
FD
1453 status = lttng_live_query_list_sessions(params, result);
1454 } else {
1455 BT_LOGW("Unknown query object `%s`", object);
1456 status = BT_QUERY_STATUS_INVALID_OBJECT;
1457 goto end;
7cdc2bab 1458 }
14f28187
FD
1459
1460end:
1461 return status;
7cdc2bab
MD
1462}
1463
1464static
1465void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1466{
19bdff20
FD
1467 if (!lttng_live) {
1468 return;
1469 }
14f28187
FD
1470 if (lttng_live->params.url) {
1471 g_string_free(lttng_live->params.url, TRUE);
7cdc2bab
MD
1472 }
1473 g_free(lttng_live);
1474}
1475
1476BT_HIDDEN
14f28187 1477void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1478{
14f28187
FD
1479 void *data = bt_self_component_get_data(
1480 bt_self_component_source_as_self_component(component));
7cdc2bab
MD
1481
1482 if (!data) {
1483 return;
1484 }
1485 lttng_live_component_destroy_data(data);
1486}
1487
1488static
14f28187
FD
1489enum session_not_found_action parse_session_not_found_action_param(
1490 const bt_value *no_session_param)
1491{
1492 enum session_not_found_action action;
1493 const char *no_session_act_str;
1494 no_session_act_str = bt_value_string_get(no_session_param);
1495 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1496 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1497 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1498 action = SESSION_NOT_FOUND_ACTION_FAIL;
1499 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1500 action = SESSION_NOT_FOUND_ACTION_END;
1501 } else {
1502 action = -1;
1503 }
1504
1505 return action;
1506}
1507
1508struct lttng_live_component *lttng_live_component_create(const bt_value *params)
7cdc2bab
MD
1509{
1510 struct lttng_live_component *lttng_live;
14f28187 1511 const bt_value *value = NULL;
7cdc2bab 1512 const char *url;
7cdc2bab
MD
1513
1514 lttng_live = g_new0(struct lttng_live_component, 1);
1515 if (!lttng_live) {
1516 goto end;
1517 }
7cdc2bab 1518 lttng_live->max_query_size = MAX_QUERY_SIZE;
14f28187
FD
1519 lttng_live->has_msg_iter = false;
1520
1521 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1522 if (!value || !bt_value_is_string(value)) {
1523 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1524 URL_PARAM);
7cdc2bab
MD
1525 goto error;
1526 }
601b0d3c 1527 url = bt_value_string_get(value);
14f28187
FD
1528 lttng_live->params.url = g_string_new(url);
1529 if (!lttng_live->params.url) {
7cdc2bab
MD
1530 goto error;
1531 }
14f28187
FD
1532
1533 value = bt_value_map_borrow_entry_value_const(params,
1534 SESS_NOT_FOUND_ACTION_PARAM);
1535
1536 if (value && bt_value_is_string(value)) {
1537 lttng_live->params.sess_not_found_act =
1538 parse_session_not_found_action_param(value);
1539 if (lttng_live->params.sess_not_found_act == -1) {
1540 BT_LOGE("Unexpected value for `%s` parameter: "
1541 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1542 bt_value_string_get(value));
1543 goto error;
1544 }
1545 } else {
1546 BT_LOGW("Optional `%s` parameter is missing or "
1547 "not a string value. Defaulting to %s=\"%s\".",
1548 SESS_NOT_FOUND_ACTION_PARAM,
1549 SESS_NOT_FOUND_ACTION_PARAM,
1550 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1551 lttng_live->params.sess_not_found_act =
1552 SESSION_NOT_FOUND_ACTION_CONTINUE;
7cdc2bab 1553 }
4c66436f 1554
7cdc2bab
MD
1555 goto end;
1556
1557error:
1558 lttng_live_component_destroy_data(lttng_live);
1559 lttng_live = NULL;
1560end:
1561 return lttng_live;
d3e4dcd8
PP
1562}
1563
f3bc2010 1564BT_HIDDEN
14f28187
FD
1565bt_self_component_status lttng_live_component_init(
1566 bt_self_component_source *self_comp,
1567 const bt_value *params, UNUSED_VAR void *init_method_data)
f3bc2010 1568{
7cdc2bab 1569 struct lttng_live_component *lttng_live;
14f28187 1570 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
7cdc2bab 1571
14f28187 1572 lttng_live = lttng_live_component_create(params);
7cdc2bab 1573 if (!lttng_live) {
14f28187 1574 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
19bdff20 1575 goto error;
7cdc2bab 1576 }
14f28187 1577 lttng_live->self_comp = self_comp;
7cdc2bab 1578
42521b69 1579 if (lttng_live_graph_is_canceled(lttng_live)) {
19bdff20
FD
1580 ret = BT_SELF_COMPONENT_STATUS_END;
1581 goto error;
4bf0e537 1582 }
14f28187 1583
d94d92ac 1584 ret = bt_self_component_source_add_output_port(
14f28187
FD
1585 lttng_live->self_comp, "out",
1586 NULL, NULL);
1587 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
19bdff20 1588 goto error;
147337a3 1589 }
7cdc2bab 1590
14f28187
FD
1591 bt_self_component_set_data(
1592 bt_self_component_source_as_self_component(self_comp),
1593 lttng_live);
19bdff20 1594 goto end;
7cdc2bab 1595
19bdff20
FD
1596error:
1597 lttng_live_component_destroy_data(lttng_live);
1598 lttng_live = NULL;
7cdc2bab
MD
1599end:
1600 return ret;
d85ef162 1601}
This page took 0.120833 seconds and 4 git commands to generate.