sink.ctf.fs: use BT_COMP_LOG*() instead of BT_LOG*()
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
14f28187 6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
f3bc2010 7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
c01594de 31#define BT_LOG_OUTPUT_LEVEL log_level
350ad6c1 32#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
c01594de 33#include "logging/log.h"
020bc26f 34
14f28187
FD
35#include <glib.h>
36#include <inttypes.h>
37#include <unistd.h>
38
578e048b 39#include "common/assert.h"
3fadfbc0 40#include <babeltrace2/babeltrace.h>
578e048b 41#include "compat/compiler.h"
3fadfbc0 42#include <babeltrace2/types.h>
f3bc2010 43
7cdc2bab
MD
44#include "data-stream.h"
45#include "metadata.h"
14f28187 46#include "lttng-live.h"
7cdc2bab 47
14f28187
FD
48#define MAX_QUERY_SIZE (256*1024)
49#define URL_PARAM "url"
50#define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
51#define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
52#define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
53#define SESS_NOT_FOUND_ACTION_END_STR "end"
7cdc2bab 54
087bc060 55#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab 56
14f28187
FD
57static
58const char *print_live_iterator_status(enum lttng_live_iterator_status status)
59{
60 switch (status) {
61 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
62 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
63 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
64 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
65 case LTTNG_LIVE_ITERATOR_STATUS_END:
66 return "LTTNG_LIVE_ITERATOR_STATUS_END";
67 case LTTNG_LIVE_ITERATOR_STATUS_OK:
68 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
69 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
70 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
71 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
72 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
73 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
74 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
75 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
76 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
77 default:
78 abort();
79 }
80}
81
82static
83const char *print_state(struct lttng_live_stream_iterator *s)
7cdc2bab
MD
84{
85 switch (s->state) {
86 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
87 return "ACTIVE_NO_DATA";
88 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
89 return "QUIESCENT_NO_DATA";
90 case LTTNG_LIVE_STREAM_QUIESCENT:
91 return "QUIESCENT";
92 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
93 return "ACTIVE_DATA";
94 case LTTNG_LIVE_STREAM_EOF:
95 return "EOF";
96 default:
97 return "ERROR";
98 }
99}
7cdc2bab 100
14f28187
FD
101#define print_stream_state(live_stream_iter) \
102 do { \
103 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
104 ", curr_inact_ts %" PRId64, \
105 print_state(live_stream_iter), \
106 live_stream_iter->last_inactivity_ts, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
6f79a7cf
MD
109
110BT_HIDDEN
42521b69 111bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
6f79a7cf 112{
14f28187
FD
113 const bt_component *component;
114 bool ret;
6f79a7cf
MD
115
116 if (!lttng_live) {
14f28187
FD
117 ret = false;
118 goto end;
7cdc2bab 119 }
7cdc2bab 120
14f28187
FD
121 component = bt_component_source_as_component_const(
122 bt_self_component_source_as_component_source(
123 lttng_live->self_comp));
6f79a7cf 124
14f28187 125 ret = bt_component_graph_is_canceled(component);
4bf0e537 126
14f28187
FD
127end:
128 return ret;
7cdc2bab
MD
129}
130
131static
132struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
133 uint64_t trace_id)
d3e4dcd8 134{
14f28187
FD
135 uint64_t trace_idx;
136 struct lttng_live_trace *ret_trace = NULL;
7cdc2bab 137
14f28187
FD
138 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
139 struct lttng_live_trace *trace =
140 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 141 if (trace->id == trace_id) {
14f28187
FD
142 ret_trace = trace;
143 goto end;
7cdc2bab
MD
144 }
145 }
14f28187
FD
146
147end:
148 return ret_trace;
d3eb6e8f
PP
149}
150
7cdc2bab 151static
14f28187 152void lttng_live_destroy_trace(struct lttng_live_trace *trace)
7cdc2bab 153{
c01594de
PP
154 bt_logging_level log_level = trace->log_level;
155
14f28187 156 BT_LOGD("Destroy lttng_live_trace");
7cdc2bab 157
14f28187
FD
158 BT_ASSERT(trace->stream_iterators);
159 g_ptr_array_free(trace->stream_iterators, TRUE);
5bd230f4 160
14f28187
FD
161 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
162 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
5bd230f4 163
7cdc2bab 164 lttng_live_metadata_fini(trace);
7cdc2bab
MD
165 g_free(trace);
166}
167
168static
169struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
170 uint64_t trace_id)
171{
172 struct lttng_live_trace *trace = NULL;
c01594de 173 bt_logging_level log_level = session->log_level;
7cdc2bab
MD
174
175 trace = g_new0(struct lttng_live_trace, 1);
176 if (!trace) {
177 goto error;
178 }
c01594de 179 trace->log_level = session->log_level;
7cdc2bab
MD
180 trace->session = session;
181 trace->id = trace_id;
14f28187
FD
182 trace->trace_class = NULL;
183 trace->trace = NULL;
184 trace->stream_iterators = g_ptr_array_new_with_free_func(
185 (GDestroyNotify) lttng_live_stream_iterator_destroy);
186 BT_ASSERT(trace->stream_iterators);
7cdc2bab 187 trace->new_metadata_needed = true;
14f28187
FD
188 g_ptr_array_add(session->traces, trace);
189
087bc060 190 BT_LOGI("Create trace");
7cdc2bab
MD
191 goto end;
192error:
193 g_free(trace);
194 trace = NULL;
195end:
196 return trace;
197}
198
199BT_HIDDEN
14f28187
FD
200struct lttng_live_trace *lttng_live_borrow_trace(
201 struct lttng_live_session *session, uint64_t trace_id)
7cdc2bab
MD
202{
203 struct lttng_live_trace *trace;
204
205 trace = lttng_live_find_trace(session, trace_id);
206 if (trace) {
14f28187 207 goto end;
7cdc2bab 208 }
7cdc2bab 209
14f28187
FD
210 /* The session is the owner of the newly created trace. */
211 trace = lttng_live_create_trace(session, trace_id);
7cdc2bab 212
14f28187
FD
213end:
214 return trace;
7cdc2bab
MD
215}
216
217BT_HIDDEN
14f28187 218int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
06994c71
MD
219 uint64_t session_id, const char *hostname,
220 const char *session_name)
7cdc2bab
MD
221{
222 int ret = 0;
14f28187 223 struct lttng_live_session *session;
c01594de 224 bt_logging_level log_level = lttng_live_msg_iter->log_level;
7cdc2bab 225
14f28187
FD
226 session = g_new0(struct lttng_live_session, 1);
227 if (!session) {
7cdc2bab
MD
228 goto error;
229 }
230
c01594de 231 session->log_level = lttng_live_msg_iter->log_level;
14f28187
FD
232 session->id = session_id;
233 session->traces = g_ptr_array_new_with_free_func(
234 (GDestroyNotify) lttng_live_destroy_trace);
235 BT_ASSERT(session->traces);
236 session->lttng_live_msg_iter = lttng_live_msg_iter;
237 session->new_streams_needed = true;
238 session->hostname = g_string_new(hostname);
239 BT_ASSERT(session->hostname);
240
241 session->session_name = g_string_new(session_name);
242 BT_ASSERT(session->session_name);
7cdc2bab 243
06994c71 244 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
14f28187
FD
245 session->id, hostname, session_name);
246 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
7cdc2bab
MD
247 goto end;
248error:
087bc060 249 BT_LOGE("Error adding session");
14f28187 250 g_free(session);
7cdc2bab
MD
251 ret = -1;
252end:
253 return ret;
254}
255
256static
257void lttng_live_destroy_session(struct lttng_live_session *session)
258{
14f28187 259 struct lttng_live_component *live_comp;
c01594de 260 bt_logging_level log_level;
14f28187
FD
261
262 if (!session) {
263 goto end;
264 }
7cdc2bab 265
c01594de 266 log_level = session->log_level;
14f28187 267 BT_LOGD("Destroy lttng live session");
7cdc2bab
MD
268 if (session->id != -1ULL) {
269 if (lttng_live_detach_session(session)) {
14f28187
FD
270 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
271 if (session->lttng_live_msg_iter &&
42521b69 272 !lttng_live_graph_is_canceled(live_comp)) {
4c66436f 273 /* Old relayd cannot detach sessions. */
14f28187 274 BT_LOGD("Unable to detach lttng live session %" PRIu64,
4c66436f
MD
275 session->id);
276 }
7cdc2bab
MD
277 }
278 session->id = -1ULL;
279 }
14f28187
FD
280
281 if (session->traces) {
282 g_ptr_array_free(session->traces, TRUE);
7cdc2bab 283 }
14f28187 284
06994c71
MD
285 if (session->hostname) {
286 g_string_free(session->hostname, TRUE);
287 }
288 if (session->session_name) {
289 g_string_free(session->session_name, TRUE);
290 }
7cdc2bab 291 g_free(session);
14f28187
FD
292
293end:
294 return;
7cdc2bab
MD
295}
296
14f28187
FD
297static
298void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 299{
14f28187
FD
300 if (!lttng_live_msg_iter) {
301 goto end;
7cdc2bab 302 }
7cdc2bab 303
14f28187
FD
304 if (lttng_live_msg_iter->sessions) {
305 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
7cdc2bab 306 }
14f28187
FD
307
308 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
309 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
310 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
311
312 /* All stream iterators must be destroyed at this point. */
313 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
314 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
315
316 g_free(lttng_live_msg_iter);
317
318end:
319 return;
320}
321
322BT_HIDDEN
323void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
324{
325 struct lttng_live_msg_iter *lttng_live_msg_iter;
326
327 BT_ASSERT(self_msg_iter);
328
329 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
330 BT_ASSERT(lttng_live_msg_iter);
331 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab
MD
332}
333
334static
14f28187 335enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
7cdc2bab
MD
336 struct lttng_live_stream_iterator *lttng_live_stream)
337{
c01594de
PP
338 bt_logging_level log_level = lttng_live_stream->log_level;
339
7cdc2bab
MD
340 switch (lttng_live_stream->state) {
341 case LTTNG_LIVE_STREAM_QUIESCENT:
342 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
343 break;
344 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
345 /* Invalid state. */
087bc060
MD
346 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
347 abort();
7cdc2bab
MD
348 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
349 /* Invalid state. */
087bc060
MD
350 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
351 abort();
7cdc2bab
MD
352 case LTTNG_LIVE_STREAM_EOF:
353 break;
354 }
14f28187 355 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
356}
357
358/*
359 * For active no data stream, fetch next data. It can be either:
360 * - quiescent: need to put it in the prio heap at quiescent end
361 * timestamp,
362 * - have data: need to wire up first event into the prio heap,
363 * - have no data on this stream at this point: need to retry (AGAIN) or
364 * return EOF.
365 */
366static
14f28187
FD
367enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
368 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
369 struct lttng_live_stream_iterator *lttng_live_stream)
370{
c01594de 371 bt_logging_level log_level = lttng_live_msg_iter->log_level;
14f28187
FD
372 enum lttng_live_iterator_status ret =
373 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
374 struct packet_index index;
375 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
376
377 if (lttng_live_stream->trace->new_metadata_needed) {
14f28187 378 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
379 goto end;
380 }
381 if (lttng_live_stream->trace->session->new_streams_needed) {
14f28187 382 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
383 goto end;
384 }
14f28187
FD
385 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
386 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
7cdc2bab
MD
387 goto end;
388 }
14f28187
FD
389 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
390 &index);
391 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
392 goto end;
393 }
f6ccaed9 394 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
7cdc2bab 395 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187
FD
396 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
397 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
398
399 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
400 last_inact_ts == curr_inact_ts) {
401 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
402 print_stream_state(lttng_live_stream);
403 } else {
14f28187 404 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
405 }
406 goto end;
407 }
408 lttng_live_stream->base_offset = index.offset;
409 lttng_live_stream->offset = index.offset;
410 lttng_live_stream->len = index.packet_size / CHAR_BIT;
411end:
14f28187
FD
412 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
413 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
7cdc2bab
MD
414 }
415 return ret;
416}
417
418/*
14f28187
FD
419 * Creation of the message requires the ctf trace class to be created
420 * beforehand, but the live protocol gives us all streams (including metadata)
421 * at once. So we split it in three steps: getting streams, getting metadata
422 * (which creates the ctf trace class), and then creating the per-stream
423 * messages.
7cdc2bab
MD
424 */
425static
14f28187
FD
426enum lttng_live_iterator_status lttng_live_get_session(
427 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
428 struct lttng_live_session *session)
429{
14f28187
FD
430 enum lttng_live_iterator_status status;
431 uint64_t trace_idx;
432 int ret = 0;
7cdc2bab 433
14f28187
FD
434 if (!session->attached) {
435 ret = lttng_live_attach_session(session);
436 if (ret) {
42521b69 437 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
14f28187
FD
438 lttng_live_msg_iter->lttng_live_comp)) {
439 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 } else {
441 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
442 }
443 goto end;
4c66436f 444 }
7cdc2bab 445 }
14f28187 446
7cdc2bab 447 status = lttng_live_get_new_streams(session);
14f28187
FD
448 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
449 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
450 goto end;
7cdc2bab 451 }
14f28187
FD
452 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
453 struct lttng_live_trace *trace =
454 g_ptr_array_index(session->traces, trace_idx);
455
7cdc2bab 456 status = lttng_live_metadata_update(trace);
14f28187
FD
457 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
458 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
459 goto end;
7cdc2bab
MD
460 }
461 }
14f28187
FD
462 status = lttng_live_lazy_msg_init(session);
463
464end:
465 return status;
7cdc2bab
MD
466}
467
468BT_HIDDEN
14f28187 469void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 470{
14f28187 471 uint64_t session_idx;
7cdc2bab 472
14f28187
FD
473 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
474 session_idx++) {
475 struct lttng_live_session *session =
476 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
477 session->new_streams_needed = true;
478 }
479}
480
481static
14f28187 482void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 483{
14f28187 484 uint64_t session_idx, trace_idx;
7cdc2bab 485
14f28187
FD
486 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
487 session_idx++) {
488 struct lttng_live_session *session =
489 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab 490 session->new_streams_needed = true;
14f28187
FD
491 for (trace_idx = 0; trace_idx < session->traces->len;
492 trace_idx++) {
493 struct lttng_live_trace *trace =
494 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab
MD
495 trace->new_metadata_needed = true;
496 }
497 }
498}
499
500static
14f28187
FD
501enum lttng_live_iterator_status
502lttng_live_iterator_handle_new_streams_and_metadata(
503 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 504{
14f28187
FD
505 enum lttng_live_iterator_status ret =
506 LTTNG_LIVE_ITERATOR_STATUS_OK;
507 uint64_t session_idx = 0, nr_sessions_opened = 0;
508 struct lttng_live_session *session;
509 enum session_not_found_action sess_not_found_act =
510 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
511
7cdc2bab 512 /*
14f28187 513 * In a remotely distant future, we could add a "new
7cdc2bab
MD
514 * session" flag to the protocol, which would tell us that we
515 * need to query for new sessions even though we have sessions
516 * currently ongoing.
517 */
14f28187
FD
518 if (lttng_live_msg_iter->sessions->len == 0) {
519 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
520 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
521 goto end;
522 } else {
523 /*
524 * Retry to create a viewer session for the requested
525 * session name.
526 */
527 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
528 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
529 goto end;
530 }
531 }
7cdc2bab 532 }
14f28187
FD
533
534 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
535 session_idx++) {
536 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
537 session_idx);
538 ret = lttng_live_get_session(lttng_live_msg_iter, session);
7cdc2bab 539 switch (ret) {
14f28187 540 case LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab 541 break;
14f28187
FD
542 case LTTNG_LIVE_ITERATOR_STATUS_END:
543 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
544 break;
545 default:
546 goto end;
547 }
548 if (!session->closed) {
549 nr_sessions_opened++;
550 }
551 }
552end:
14f28187
FD
553 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
554 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
555 nr_sessions_opened == 0) {
556 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
557 }
558 return ret;
559}
560
561static
14f28187
FD
562enum lttng_live_iterator_status emit_inactivity_message(
563 struct lttng_live_msg_iter *lttng_live_msg_iter,
564 struct lttng_live_stream_iterator *stream_iter,
565 bt_message **message, uint64_t timestamp)
7cdc2bab 566{
14f28187
FD
567 enum lttng_live_iterator_status ret =
568 LTTNG_LIVE_ITERATOR_STATUS_OK;
569 bt_message *msg = NULL;
7cdc2bab 570
14f28187
FD
571 BT_ASSERT(stream_iter->trace->clock_class);
572
573 msg = bt_message_message_iterator_inactivity_create(
574 lttng_live_msg_iter->self_msg_iter,
575 stream_iter->trace->clock_class,
576 timestamp);
d6e69534 577 if (!msg) {
7cdc2bab
MD
578 goto error;
579 }
14f28187 580
d6e69534 581 *message = msg;
7cdc2bab 582end:
7cdc2bab
MD
583 return ret;
584
585error:
14f28187 586 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
d6e69534 587 bt_message_put_ref(msg);
7cdc2bab
MD
588 goto end;
589}
590
591static
14f28187
FD
592enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
593 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 594 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 595 bt_message **message)
7cdc2bab 596{
14f28187
FD
597 enum lttng_live_iterator_status ret =
598 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
599
600 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
14f28187 601 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
602 }
603
14f28187
FD
604 if (lttng_live_stream->current_inactivity_ts ==
605 lttng_live_stream->last_inactivity_ts) {
7cdc2bab 606 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
14f28187
FD
607 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
608 goto end;
609 }
610
611 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
612 message, lttng_live_stream->current_inactivity_ts);
613
614 lttng_live_stream->last_inactivity_ts =
615 lttng_live_stream->current_inactivity_ts;
616end:
617 return ret;
618}
619
620static
621int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
622 struct lttng_live_msg_iter *lttng_live_msg_iter,
623 const bt_message *msg, int64_t last_msg_ts_ns,
624 int64_t *ts_ns)
625{
626 const bt_clock_class *clock_class = NULL;
627 const bt_clock_snapshot *clock_snapshot = NULL;
628 int ret = 0;
14f28187 629 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
c01594de 630 bt_logging_level log_level = lttng_live_msg_iter->log_level;
14f28187
FD
631
632 BT_ASSERT(msg);
633 BT_ASSERT(ts_ns);
634
3f7d4d90 635 BT_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
14f28187
FD
636 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
637 last_msg_ts_ns);
638
639 switch (bt_message_get_type(msg)) {
640 case BT_MESSAGE_TYPE_EVENT:
641 clock_class =
642 bt_message_event_borrow_stream_class_default_clock_class_const(
643 msg);
644 BT_ASSERT(clock_class);
645
0cbc2c33
PP
646 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
647 msg);
14f28187
FD
648 break;
649 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
650 clock_class =
651 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
652 msg);
653 BT_ASSERT(clock_class);
654
0cbc2c33
PP
655 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
656 msg);
14f28187
FD
657 break;
658 case BT_MESSAGE_TYPE_PACKET_END:
659 clock_class =
660 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
661 msg);
662 BT_ASSERT(clock_class);
663
0cbc2c33
PP
664 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
665 msg);
14f28187
FD
666 break;
667 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
668 clock_class =
669 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
670 msg);
671 BT_ASSERT(clock_class);
672
9b24b6aa 673 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 674 msg);
14f28187
FD
675 break;
676 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
677 clock_class =
678 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
679 msg);
680 BT_ASSERT(clock_class);
681
9b24b6aa 682 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
0cbc2c33 683 msg);
14f28187
FD
684 break;
685 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
686 clock_class =
687 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
688 msg);
689 BT_ASSERT(clock_class);
690
691 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
692 msg, &clock_snapshot);
693 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
694 goto no_clock_snapshot;
695 }
696
697 break;
698 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
699 clock_class =
700 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
701 msg);
702 BT_ASSERT(clock_class);
703
704 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
705 msg, &clock_snapshot);
706 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
707 goto no_clock_snapshot;
708 }
709
710 break;
711 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
0cbc2c33 712 clock_snapshot =
14f28187 713 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
0cbc2c33 714 msg);
14f28187
FD
715 break;
716 default:
717 /* All the other messages have a higher priority */
3f7d4d90 718 BT_LOGD_STR("Message has no timestamp: using the last message timestamp.");
14f28187 719 *ts_ns = last_msg_ts_ns;
7cdc2bab
MD
720 goto end;
721 }
722
14f28187
FD
723 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
724 BT_ASSERT(clock_class);
725
726 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
727 if (ret) {
728 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
729 "clock-snapshot-addr=%p", clock_snapshot);
730 goto error;
731 }
732
733 goto end;
734
735no_clock_snapshot:
3f7d4d90 736 BT_LOGD_STR("Message's default clock snapshot is missing: "
14f28187
FD
737 "using the last message timestamp.");
738 *ts_ns = last_msg_ts_ns;
739 goto end;
740
741error:
742 ret = -1;
7cdc2bab 743
7cdc2bab 744end:
14f28187 745 if (ret == 0) {
3f7d4d90 746 BT_LOGD("Found message's timestamp: "
14f28187
FD
747 "iter-data-addr=%p, msg-addr=%p, "
748 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
749 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
750 }
751
7cdc2bab
MD
752 return ret;
753}
754
755static
14f28187
FD
756enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
757 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab 758 struct lttng_live_stream_iterator *lttng_live_stream,
14f28187 759 bt_message **message)
7cdc2bab 760{
14f28187 761 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
d6e69534 762 enum bt_msg_iter_status status;
14f28187 763 uint64_t session_idx, trace_idx;
c01594de 764 bt_logging_level log_level = lttng_live_msg_iter->log_level;
7cdc2bab 765
14f28187
FD
766 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
767 session_idx++) {
768 struct lttng_live_session *session =
769 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
7cdc2bab
MD
770
771 if (session->new_streams_needed) {
14f28187
FD
772 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
773 goto end;
7cdc2bab 774 }
14f28187
FD
775 for (trace_idx = 0; trace_idx < session->traces->len;
776 trace_idx++) {
777 struct lttng_live_trace *trace =
778 g_ptr_array_index(session->traces, trace_idx);
7cdc2bab 779 if (trace->new_metadata_needed) {
14f28187
FD
780 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
781 goto end;
7cdc2bab
MD
782 }
783 }
784 }
785
786 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
14f28187
FD
787 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
788 goto end;
7cdc2bab 789 }
14f28187
FD
790
791 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
792 lttng_live_msg_iter->self_msg_iter, message);
7cdc2bab 793 switch (status) {
d6e69534 794 case BT_MSG_ITER_STATUS_EOF:
14f28187 795 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 796 break;
d6e69534 797 case BT_MSG_ITER_STATUS_OK:
14f28187 798 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 799 break;
d6e69534 800 case BT_MSG_ITER_STATUS_AGAIN:
7cdc2bab
MD
801 /*
802 * Continue immediately (end of packet). The next
803 * get_index may return AGAIN to delay the following
804 * attempt.
805 */
14f28187 806 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab 807 break;
d6e69534 808 case BT_MSG_ITER_STATUS_INVAL:
7cdc2bab 809 /* No argument provided by the user, so don't return INVAL. */
d6e69534 810 case BT_MSG_ITER_STATUS_ERROR:
7cdc2bab 811 default:
14f28187
FD
812 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
813 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
814 lttng_live_stream->msg_iter);
7cdc2bab
MD
815 break;
816 }
14f28187
FD
817
818end:
7cdc2bab
MD
819 return ret;
820}
821
822/*
823 * helper function:
824 * handle_no_data_streams()
825 * retry:
826 * - for each ACTIVE_NO_DATA stream:
827 * - query relayd for stream data, or quiescence info.
828 * - if need metadata, get metadata, goto retry.
829 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
830 * - if quiescent, move to QUIESCENT streams
831 * - if fetched data, move to ACTIVE_DATA streams
832 * (at this point each stream either has data, or is quiescent)
833 *
834 *
835 * iterator_next:
836 * handle_new_streams_and_metadata()
837 * - query relayd for known streams, add them as ACTIVE_NO_DATA
838 * - query relayd for metadata
839 *
840 * call handle_active_no_data_streams()
841 *
842 * handle_quiescent_streams()
843 * - if at least one stream is ACTIVE_DATA:
844 * - peek stream event with lowest timestamp -> next_ts
845 * - for each quiescent stream
846 * - if next_ts >= quiescent end
847 * - set state to ACTIVE_NO_DATA
848 * - else
849 * - for each quiescent stream
850 * - set state to ACTIVE_NO_DATA
851 *
852 * call handle_active_no_data_streams()
853 *
854 * handle_active_data_streams()
855 * - if at least one stream is ACTIVE_DATA:
856 * - get stream event with lowest timestamp from heap
d6e69534 857 * - make that stream event the current message.
7cdc2bab
MD
858 * - move this stream heap position to its next event
859 * - if we need to fetch data from relayd, move
860 * stream to ACTIVE_NO_DATA.
861 * - return OK
862 * - return AGAIN
863 *
864 * end criterion: ctrl-c on client. If relayd exits or the session
865 * closes on the relay daemon side, we keep on waiting for streams.
866 * Eventually handle --end timestamp (also an end criterion).
867 *
868 * When disconnected from relayd: try to re-connect endlessly.
869 */
870static
14f28187
FD
871enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
872 struct lttng_live_msg_iter *lttng_live_msg_iter,
873 struct lttng_live_stream_iterator *stream_iter,
874 bt_message **curr_msg)
7cdc2bab 875{
c01594de 876 bt_logging_level log_level = lttng_live_msg_iter->log_level;
14f28187 877 enum lttng_live_iterator_status live_status;
7cdc2bab 878
7cdc2bab
MD
879retry:
880 print_stream_state(stream_iter);
14f28187
FD
881 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
882 lttng_live_msg_iter);
883 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
884 goto end;
885 }
14f28187
FD
886 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
887 lttng_live_msg_iter, stream_iter);
888 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
889 goto end;
890 }
14f28187
FD
891 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
892 lttng_live_msg_iter, stream_iter, curr_msg);
893 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
894 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
895 goto end;
896 }
14f28187 897 if (*curr_msg) {
7cdc2bab
MD
898 goto end;
899 }
14f28187
FD
900 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
901 lttng_live_msg_iter, stream_iter, curr_msg);
902 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
903 BT_ASSERT(*curr_msg == NULL);
7cdc2bab
MD
904 }
905
906end:
14f28187 907 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
7cdc2bab 908 goto retry;
7cdc2bab 909 }
14f28187
FD
910
911 return live_status;
7cdc2bab
MD
912}
913
914static
14f28187
FD
915enum lttng_live_iterator_status next_stream_iterator_for_trace(
916 struct lttng_live_msg_iter *lttng_live_msg_iter,
917 struct lttng_live_trace *live_trace,
918 struct lttng_live_stream_iterator **candidate_stream_iter)
7cdc2bab 919{
14f28187
FD
920 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
921 enum lttng_live_iterator_status stream_iter_status;;
922 int64_t curr_candidate_msg_ts = INT64_MAX;
923 uint64_t stream_iter_idx;
c01594de 924 bt_logging_level log_level = lttng_live_msg_iter->log_level;
7cdc2bab 925
14f28187
FD
926 BT_ASSERT(live_trace);
927 BT_ASSERT(live_trace->stream_iterators);
928 /*
929 * Update the current message of every stream iterators of this trace.
930 * The current msg of every stream must have a timestamp equal or
931 * larger than the last message returned by this iterator. We must
932 * ensure monotonicity.
933 */
934 stream_iter_idx = 0;
935 while (stream_iter_idx < live_trace->stream_iterators->len) {
936 bool stream_iter_is_ended = false;
937 struct lttng_live_stream_iterator *stream_iter =
938 g_ptr_array_index(live_trace->stream_iterators,
939 stream_iter_idx);
940
941 /*
942 * Find if there is are now current message for this stream
943 * iterator get it.
944 */
945 while (!stream_iter->current_msg) {
946 bt_message *msg = NULL;
947 int64_t curr_msg_ts_ns = INT64_MAX;
948 stream_iter_status = lttng_live_iterator_next_on_stream(
949 lttng_live_msg_iter, stream_iter, &msg);
950
951 BT_LOGD("live stream iterator returned status :%s",
952 print_live_iterator_status(stream_iter_status));
953 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
954 stream_iter_is_ended = true;
955 break;
956 }
957
958 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
959 goto end;
960 }
961
962 BT_ASSERT(msg);
963
964 /*
965 * Get the timestamp in nanoseconds from origin of this
966 * messsage.
967 */
968 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
969 msg, lttng_live_msg_iter->last_msg_ts_ns,
970 &curr_msg_ts_ns);
971
972 /*
973 * Check if the message of the current live stream
974 * iterator occured at the exact same time or after the
975 * last message returned by this component's message
976 * iterator. If not, we return an error.
977 */
978 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
979 stream_iter->current_msg = msg;
980 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
981 } else {
982 /*
983 * We received a message in the past. To ensure
984 * monotonicity, we can't send it forward.
985 */
986 BT_LOGE("Message's timestamp is less than "
987 "lttng-live's message iterator's last "
988 "returned timestamp: "
989 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
990 "last-msg-ts=%" PRId64,
991 lttng_live_msg_iter, curr_msg_ts_ns,
992 lttng_live_msg_iter->last_msg_ts_ns);
993 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
994 goto end;
995 }
996 }
997
998 if (!stream_iter_is_ended &&
999 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1000 /*
1001 * Update the current best candidate message for the
1002 * stream iterator of thise live trace to be forwarded
1003 * downstream.
1004 */
1005 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1006 curr_candidate_stream_iter = stream_iter;
1007 }
1008
1009 if (stream_iter_is_ended) {
1010 /*
1011 * The live stream iterator is ENDed. We remove that
1012 * iterator from the list and we restart the iteration
1013 * at the beginning of the live stream iterator array
1014 * to because the removal will shuffle the array.
1015 */
1016 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1017 stream_iter_idx);
1018 stream_iter_idx = 0;
1019 } else {
1020 stream_iter_idx++;
1021 }
1022 }
1023
1024 if (curr_candidate_stream_iter) {
1025 *candidate_stream_iter = curr_candidate_stream_iter;
1026 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1027 } else {
1028 /*
1029 * The only case where we don't have a candidate for this trace
1030 * is if we reached the end of all the iterators.
1031 */
1032 BT_ASSERT(live_trace->stream_iterators->len == 0);
1033 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1034 }
1035
1036end:
1037 return stream_iter_status;
1038}
1039
1040static
1041enum lttng_live_iterator_status next_stream_iterator_for_session(
1042 struct lttng_live_msg_iter *lttng_live_msg_iter,
1043 struct lttng_live_session *session,
1044 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1045{
1046 enum lttng_live_iterator_status stream_iter_status;
1047 uint64_t trace_idx = 0;
1048 int64_t curr_candidate_msg_ts = INT64_MAX;
1049 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1050
1051 /*
1052 * Make sure we are attached to the session and look for new streams
1053 * and metadata.
1054 */
1055 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1056 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1057 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1058 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
1059 goto end;
1060 }
14f28187
FD
1061
1062 BT_ASSERT(session->traces);
1063
1064 /*
1065 * Use while loops here rather then for loops so we can restart the
1066 * iteration if an element is removed from the array during the
1067 * looping.
1068 */
1069 while (trace_idx < session->traces->len) {
1070 bool trace_is_ended = false;
1071 struct lttng_live_stream_iterator *stream_iter;
1072 struct lttng_live_trace *trace =
1073 g_ptr_array_index(session->traces, trace_idx);
1074
1075 stream_iter_status = next_stream_iterator_for_trace(
1076 lttng_live_msg_iter, trace, &stream_iter);
1077 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1078 /*
1079 * All the live stream iterators for this trace are
1080 * ENDed. Remove the trace from this session.
1081 */
1082 trace_is_ended = true;
1083 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1084 goto end;
1085 }
1086
1087 if (!trace_is_ended) {
1088 BT_ASSERT(stream_iter);
1089
1090 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1091 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1092 curr_candidate_stream_iter = stream_iter;
1093 }
1094 trace_idx++;
1095 } else {
1096 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1097 trace_idx = 0;
1098 }
1099 }
1100 if (curr_candidate_stream_iter) {
1101 *candidate_session_stream_iter = curr_candidate_stream_iter;
1102 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 1103 } else {
14f28187
FD
1104 /*
1105 * The only cases where we don't have a candidate for this
1106 * trace is:
1107 * 1. if we reached the end of all the iterators of all the
1108 * traces of this session,
1109 * 2. if we never had live stream iterator in the first place.
1110 *
1111 * In either cases, we return END.
1112 */
1113 BT_ASSERT(session->traces->len == 0);
1114 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1115 }
1116end:
14f28187
FD
1117 return stream_iter_status;
1118}
1119
1120static inline
1121void put_messages(bt_message_array_const msgs, uint64_t count)
1122{
1123 uint64_t i;
1124
1125 for (i = 0; i < count; i++) {
1126 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
7cdc2bab 1127 }
7cdc2bab
MD
1128}
1129
d3eb6e8f 1130BT_HIDDEN
14f28187
FD
1131bt_self_message_iterator_status lttng_live_msg_iter_next(
1132 bt_self_message_iterator *self_msg_it,
1133 bt_message_array_const msgs, uint64_t capacity,
1134 uint64_t *count)
d3eb6e8f 1135{
14f28187
FD
1136 bt_self_message_iterator_status status;
1137 struct lttng_live_msg_iter *lttng_live_msg_iter =
1138 bt_self_message_iterator_get_data(self_msg_it);
1139 struct lttng_live_component *lttng_live =
1140 lttng_live_msg_iter->lttng_live_comp;
1141 enum lttng_live_iterator_status stream_iter_status;
1142 uint64_t session_idx;
1143
1144 *count = 0;
1145
1146 BT_ASSERT(lttng_live_msg_iter);
1147
1148 /*
1149 * Clear all the invalid message reference that might be left over in
1150 * the output array.
1151 */
1152 memset(msgs, 0, capacity * sizeof(*msgs));
1153
1154 /*
1155 * If no session are exposed on the relay found at the url provided by
1156 * the user, session count will be 0. In this case, we return status
1157 * end to return gracefully.
1158 */
1159 if (lttng_live_msg_iter->sessions->len == 0) {
1160 if (lttng_live->params.sess_not_found_act !=
1161 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1162 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1163 goto no_session;
1164 } else {
1165 /*
1166 * The are no more active session for this session
1167 * name. Retry to create a viewer session for the
1168 * requested session name.
1169 */
1170 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1171 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1172 goto no_session;
1173 }
1174 }
1175 }
1176
1177 if (lttng_live_msg_iter->active_stream_iter == 0) {
1178 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1179 }
1180
1181 /*
1182 * Here the muxing of message is done.
1183 *
1184 * We need to iterate over all the streams of all the traces of all the
1185 * viewer sessions in order to get the message with the smallest
1186 * timestamp. In this case, a session is a viewer session and there is
1187 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1188 * kernel). Each viewer session can have multiple traces, for example,
1189 * 64bit UST viewer sessions could have multiple per-pid traces.
1190 *
1191 * We iterate over the streams of each traces to update and see what is
1192 * their next message's timestamp. From those timestamps, we select the
1193 * message with the smallest timestamp as the best candidate message
1194 * for that trace and do the same thing across all the sessions.
1195 *
1196 * We then compare the timestamp of best candidate message of all the
1197 * sessions to pick the message with the smallest timestamp and we
1198 * return it.
1199 */
1200 while (*count < capacity) {
1201 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1202 *candidate_stream_iter = NULL;
1203 int64_t next_msg_ts_ns = INT64_MAX;
1204
1205 BT_ASSERT(lttng_live_msg_iter->sessions);
1206 session_idx = 0;
1207 /*
1208 * Use a while loop instead of a for loop so we can restart the
1209 * iteration if we remove an element. We can safely call
1210 * next_stream_iterator_for_session() multiple times on the
1211 * same session as we only fetch a new message if there is no
1212 * current next message for each live stream iterator.
1213 * If all live stream iterator of that session already have a
1214 * current next message, the function will simply exit return
1215 * the same candidate live stream iterator every time.
1216 */
1217 while (session_idx < lttng_live_msg_iter->sessions->len) {
1218 struct lttng_live_session *session =
1219 g_ptr_array_index(lttng_live_msg_iter->sessions,
1220 session_idx);
1221
1222 /* Find the best candidate message to send downstream. */
1223 stream_iter_status = next_stream_iterator_for_session(
1224 lttng_live_msg_iter, session,
1225 &candidate_stream_iter);
1226
1227 /* If we receive an END status, it means that either:
1228 * - Those traces never had active streams (UST with no
1229 * data produced yet),
1230 * - All live stream iterators have ENDed.*/
1231 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1232 if (session->closed && session->traces->len == 0) {
1233 /*
1234 * Remove the session from the list and restart the
1235 * iteration at the beginning of the array since the
1236 * removal shuffle the elements of the array.
1237 */
1238 g_ptr_array_remove_index_fast(
1239 lttng_live_msg_iter->sessions,
1240 session_idx);
1241 session_idx = 0;
1242 } else {
1243 session_idx++;
1244 }
1245 continue;
1246 }
1247
1248 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1249 goto end;
1250 }
1251
1252 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1253 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1254 next_stream_iter = candidate_stream_iter;
1255 }
1256
1257 session_idx++;
1258 }
1259
1260 if (!next_stream_iter) {
1261 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1262 goto end;
1263 }
1264
1265 BT_ASSERT(next_stream_iter->current_msg);
1266 /* Ensure monotonicity. */
1267 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1268 next_stream_iter->current_msg_ts_ns);
1269
1270 /*
1271 * Insert the next message to the message batch. This will set
1272 * stream iterator current messsage to NULL so that next time
1273 * we fetch the next message of that stream iterator
1274 */
1275 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1276 (*count)++;
1277
1278 /* Update the last timestamp in nanoseconds sent downstream. */
1279 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1280 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1281
1282 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1283 }
1284end:
1285 switch (stream_iter_status) {
1286 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1287 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1288 if (*count > 0) {
1289 /*
1290 * We received a again status but we have some messages
1291 * to send downstream. We send them and return OK for
1292 * now. On the next call we return again if there are
1293 * still no new message to send.
1294 */
1295 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1296 } else {
1297 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1298 }
7cdc2bab 1299 break;
14f28187
FD
1300 case LTTNG_LIVE_ITERATOR_STATUS_END:
1301 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
7cdc2bab 1302 break;
14f28187
FD
1303 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1304 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1305 break;
1306 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1307 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1308 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1309 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1310 /* Put all existing messages on error. */
1311 put_messages(msgs, *count);
7cdc2bab 1312 break;
14f28187
FD
1313 default:
1314 abort();
7cdc2bab 1315 }
14f28187
FD
1316
1317no_session:
1318 return status;
7cdc2bab 1319}
41a2b7ae 1320
7cdc2bab 1321BT_HIDDEN
14f28187
FD
1322bt_self_message_iterator_status lttng_live_msg_iter_init(
1323 bt_self_message_iterator *self_msg_it,
1324 bt_self_component_source *self_comp_src,
1325 bt_self_component_port_output *self_port)
7cdc2bab 1326{
14f28187
FD
1327 bt_self_message_iterator_status ret =
1328 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1329 bt_self_component *self_comp =
1330 bt_self_component_source_as_self_component(self_comp_src);
1331 struct lttng_live_component *lttng_live;
1332 struct lttng_live_msg_iter *lttng_live_msg_iter;
c01594de 1333 bt_logging_level log_level;
14f28187
FD
1334
1335 BT_ASSERT(self_msg_it);
1336
1337 lttng_live = bt_self_component_get_data(self_comp);
c01594de 1338 log_level = lttng_live->log_level;
14f28187
FD
1339
1340 /* There can be only one downstream iterator at the same time. */
1341 BT_ASSERT(!lttng_live->has_msg_iter);
1342 lttng_live->has_msg_iter = true;
1343
1344 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1345 if (!lttng_live_msg_iter) {
1346 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1347 goto end;
7cdc2bab 1348 }
14f28187 1349
c01594de 1350 lttng_live_msg_iter->log_level = lttng_live->log_level;
14f28187
FD
1351 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1352 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1353
1354 lttng_live_msg_iter->active_stream_iter = 0;
1355 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1356 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1357 (GDestroyNotify) lttng_live_destroy_session);
1358 BT_ASSERT(lttng_live_msg_iter->sessions);
1359
1360 lttng_live_msg_iter->viewer_connection =
1361 live_viewer_connection_create(lttng_live->params.url->str, false,
1362 lttng_live_msg_iter);
1363 if (!lttng_live_msg_iter->viewer_connection) {
1364 goto error;
1365 }
1366
1367 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1368 goto error;
1369 }
1370 if (lttng_live_msg_iter->sessions->len == 0) {
1371 switch (lttng_live->params.sess_not_found_act) {
1372 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1373 BT_LOGI("Unable to connect to the requested live viewer "
1374 "session. Keep trying to connect because of "
1375 "%s=\"%s\" component parameter: url=\"%s\"",
1376 SESS_NOT_FOUND_ACTION_PARAM,
1377 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1378 lttng_live->params.url->str);
1379 break;
1380 case SESSION_NOT_FOUND_ACTION_FAIL:
1381 BT_LOGE("Unable to connect to the requested live viewer "
1382 "session. Fail the message iterator"
1383 "initialization because of %s=\"%s\" "
1384 "component parameter: url =\"%s\"",
1385 SESS_NOT_FOUND_ACTION_PARAM,
1386 SESS_NOT_FOUND_ACTION_FAIL_STR,
1387 lttng_live->params.url->str);
7cdc2bab 1388 goto error;
14f28187
FD
1389 case SESSION_NOT_FOUND_ACTION_END:
1390 BT_LOGI("Unable to connect to the requested live viewer "
1391 "session. End gracefully at the first _next() "
1392 "call because of %s=\"%s\" component parameter: "
1393 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1394 SESS_NOT_FOUND_ACTION_END_STR,
1395 lttng_live->params.url->str);
1396 break;
7cdc2bab 1397 }
7cdc2bab
MD
1398 }
1399
14f28187
FD
1400 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1401
1402 goto end;
1403error:
1404 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1405 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
7cdc2bab 1406end:
41a2b7ae 1407 return ret;
7cdc2bab
MD
1408}
1409
1410static
14f28187 1411bt_query_status lttng_live_query_list_sessions(const bt_value *params,
c01594de 1412 const bt_value **result, bt_logging_level log_level)
7cdc2bab 1413{
14f28187
FD
1414 bt_query_status status = BT_QUERY_STATUS_OK;
1415 const bt_value *url_value = NULL;
7cdc2bab 1416 const char *url;
14f28187 1417 struct live_viewer_connection *viewer_connection = NULL;
7cdc2bab 1418
14f28187
FD
1419 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1420 if (!url_value) {
1421 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1422 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1423 goto error;
1424 }
1425
14f28187
FD
1426 if (!bt_value_is_string(url_value)) {
1427 BT_LOGW("`%s` parameter is required to be a string value",
1428 URL_PARAM);
1429 status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
1430 goto error;
1431 }
1432
14f28187
FD
1433 url = bt_value_string_get(url_value);
1434
1435 viewer_connection = live_viewer_connection_create(url, true, NULL);
7cdc2bab 1436 if (!viewer_connection) {
7cdc2bab
MD
1437 goto error;
1438 }
1439
14f28187
FD
1440 status = live_viewer_connection_list_sessions(viewer_connection,
1441 result);
1442 if (status != BT_QUERY_STATUS_OK) {
c7eee084
PP
1443 goto error;
1444 }
1445
7cdc2bab 1446 goto end;
c7eee084 1447
7cdc2bab 1448error:
14f28187 1449 BT_VALUE_PUT_REF_AND_RESET(*result);
c7eee084 1450
14f28187
FD
1451 if (status >= 0) {
1452 status = BT_QUERY_STATUS_ERROR;
c7eee084
PP
1453 }
1454
7cdc2bab
MD
1455end:
1456 if (viewer_connection) {
14f28187 1457 live_viewer_connection_destroy(viewer_connection);
7cdc2bab 1458 }
14f28187 1459 return status;
7cdc2bab
MD
1460}
1461
1462BT_HIDDEN
14f28187 1463bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
b19ff26f 1464 const bt_query_executor *query_exec,
14f28187 1465 const char *object, const bt_value *params,
c01594de 1466 bt_logging_level log_level, const bt_value **result)
7cdc2bab 1467{
14f28187 1468 bt_query_status status = BT_QUERY_STATUS_OK;
c7eee084 1469
7cdc2bab 1470 if (strcmp(object, "sessions") == 0) {
c01594de
PP
1471 status = lttng_live_query_list_sessions(params, result,
1472 log_level);
14f28187
FD
1473 } else {
1474 BT_LOGW("Unknown query object `%s`", object);
1475 status = BT_QUERY_STATUS_INVALID_OBJECT;
1476 goto end;
7cdc2bab 1477 }
14f28187
FD
1478
1479end:
1480 return status;
7cdc2bab
MD
1481}
1482
1483static
1484void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1485{
19bdff20
FD
1486 if (!lttng_live) {
1487 return;
1488 }
14f28187
FD
1489 if (lttng_live->params.url) {
1490 g_string_free(lttng_live->params.url, TRUE);
7cdc2bab
MD
1491 }
1492 g_free(lttng_live);
1493}
1494
1495BT_HIDDEN
14f28187 1496void lttng_live_component_finalize(bt_self_component_source *component)
7cdc2bab 1497{
14f28187
FD
1498 void *data = bt_self_component_get_data(
1499 bt_self_component_source_as_self_component(component));
7cdc2bab
MD
1500
1501 if (!data) {
1502 return;
1503 }
1504 lttng_live_component_destroy_data(data);
1505}
1506
1507static
14f28187
FD
1508enum session_not_found_action parse_session_not_found_action_param(
1509 const bt_value *no_session_param)
1510{
1511 enum session_not_found_action action;
1512 const char *no_session_act_str;
1513 no_session_act_str = bt_value_string_get(no_session_param);
1514 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1515 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1516 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1517 action = SESSION_NOT_FOUND_ACTION_FAIL;
1518 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1519 action = SESSION_NOT_FOUND_ACTION_END;
1520 } else {
1521 action = -1;
1522 }
1523
1524 return action;
1525}
1526
c01594de
PP
1527struct lttng_live_component *lttng_live_component_create(const bt_value *params,
1528 bt_logging_level log_level)
7cdc2bab
MD
1529{
1530 struct lttng_live_component *lttng_live;
14f28187 1531 const bt_value *value = NULL;
7cdc2bab 1532 const char *url;
7cdc2bab
MD
1533
1534 lttng_live = g_new0(struct lttng_live_component, 1);
1535 if (!lttng_live) {
1536 goto end;
1537 }
c01594de 1538 lttng_live->log_level = log_level;
7cdc2bab 1539 lttng_live->max_query_size = MAX_QUERY_SIZE;
14f28187
FD
1540 lttng_live->has_msg_iter = false;
1541
1542 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1543 if (!value || !bt_value_is_string(value)) {
1544 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1545 URL_PARAM);
7cdc2bab
MD
1546 goto error;
1547 }
601b0d3c 1548 url = bt_value_string_get(value);
14f28187
FD
1549 lttng_live->params.url = g_string_new(url);
1550 if (!lttng_live->params.url) {
7cdc2bab
MD
1551 goto error;
1552 }
14f28187
FD
1553
1554 value = bt_value_map_borrow_entry_value_const(params,
1555 SESS_NOT_FOUND_ACTION_PARAM);
1556
1557 if (value && bt_value_is_string(value)) {
1558 lttng_live->params.sess_not_found_act =
1559 parse_session_not_found_action_param(value);
1560 if (lttng_live->params.sess_not_found_act == -1) {
1561 BT_LOGE("Unexpected value for `%s` parameter: "
1562 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1563 bt_value_string_get(value));
1564 goto error;
1565 }
1566 } else {
1567 BT_LOGW("Optional `%s` parameter is missing or "
1568 "not a string value. Defaulting to %s=\"%s\".",
1569 SESS_NOT_FOUND_ACTION_PARAM,
1570 SESS_NOT_FOUND_ACTION_PARAM,
1571 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1572 lttng_live->params.sess_not_found_act =
1573 SESSION_NOT_FOUND_ACTION_CONTINUE;
7cdc2bab 1574 }
4c66436f 1575
7cdc2bab
MD
1576 goto end;
1577
1578error:
1579 lttng_live_component_destroy_data(lttng_live);
1580 lttng_live = NULL;
1581end:
1582 return lttng_live;
d3e4dcd8
PP
1583}
1584
f3bc2010 1585BT_HIDDEN
14f28187 1586bt_self_component_status lttng_live_component_init(
c01594de 1587 bt_self_component_source *self_comp_src,
c88dd1cb 1588 const bt_value *params, __attribute__((unused)) void *init_method_data)
f3bc2010 1589{
7cdc2bab 1590 struct lttng_live_component *lttng_live;
14f28187 1591 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
c01594de
PP
1592 bt_self_component *self_comp =
1593 bt_self_component_source_as_self_component(self_comp_src);
1594 bt_logging_level log_level = bt_component_get_logging_level(
1595 bt_self_component_as_component(self_comp));
7cdc2bab 1596
c01594de 1597 lttng_live = lttng_live_component_create(params, log_level);
7cdc2bab 1598 if (!lttng_live) {
14f28187 1599 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
19bdff20 1600 goto error;
7cdc2bab 1601 }
c01594de 1602 lttng_live->self_comp = self_comp_src;
7cdc2bab 1603
42521b69 1604 if (lttng_live_graph_is_canceled(lttng_live)) {
19bdff20
FD
1605 ret = BT_SELF_COMPONENT_STATUS_END;
1606 goto error;
4bf0e537 1607 }
14f28187 1608
d94d92ac 1609 ret = bt_self_component_source_add_output_port(
14f28187
FD
1610 lttng_live->self_comp, "out",
1611 NULL, NULL);
1612 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
19bdff20 1613 goto error;
147337a3 1614 }
7cdc2bab 1615
c01594de 1616 bt_self_component_set_data(self_comp, lttng_live);
19bdff20 1617 goto end;
7cdc2bab 1618
19bdff20
FD
1619error:
1620 lttng_live_component_destroy_data(lttng_live);
1621 lttng_live = NULL;
7cdc2bab
MD
1622end:
1623 return ret;
d85ef162 1624}
This page took 0.122687 seconds and 4 git commands to generate.