src.ctf.lttng-live: honor component's initial log level and query LL
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31 #define BT_LOG_OUTPUT_LEVEL log_level
32 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE"
33 #include "logging/log.h"
34
35 #include <glib.h>
36 #include <inttypes.h>
37 #include <unistd.h>
38
39 #include "common/assert.h"
40 #include <babeltrace2/babeltrace.h>
41 #include "compat/compiler.h"
42 #include <babeltrace2/types.h>
43
44 #include "data-stream.h"
45 #include "metadata.h"
46 #include "lttng-live.h"
47
48 #define MAX_QUERY_SIZE (256*1024)
49 #define URL_PARAM "url"
50 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
51 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
52 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
53 #define SESS_NOT_FOUND_ACTION_END_STR "end"
54
55 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
56
57 static
58 const char *print_live_iterator_status(enum lttng_live_iterator_status status)
59 {
60 switch (status) {
61 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
62 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
63 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
64 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
65 case LTTNG_LIVE_ITERATOR_STATUS_END:
66 return "LTTNG_LIVE_ITERATOR_STATUS_END";
67 case LTTNG_LIVE_ITERATOR_STATUS_OK:
68 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
69 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
70 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
71 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
72 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
73 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
74 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
75 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
76 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
77 default:
78 abort();
79 }
80 }
81
82 static
83 const char *print_state(struct lttng_live_stream_iterator *s)
84 {
85 switch (s->state) {
86 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
87 return "ACTIVE_NO_DATA";
88 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
89 return "QUIESCENT_NO_DATA";
90 case LTTNG_LIVE_STREAM_QUIESCENT:
91 return "QUIESCENT";
92 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
93 return "ACTIVE_DATA";
94 case LTTNG_LIVE_STREAM_EOF:
95 return "EOF";
96 default:
97 return "ERROR";
98 }
99 }
100
101 #define print_stream_state(live_stream_iter) \
102 do { \
103 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
104 ", curr_inact_ts %" PRId64, \
105 print_state(live_stream_iter), \
106 live_stream_iter->last_inactivity_ts, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
109
110 BT_HIDDEN
111 bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
112 {
113 const bt_component *component;
114 bool ret;
115
116 if (!lttng_live) {
117 ret = false;
118 goto end;
119 }
120
121 component = bt_component_source_as_component_const(
122 bt_self_component_source_as_component_source(
123 lttng_live->self_comp));
124
125 ret = bt_component_graph_is_canceled(component);
126
127 end:
128 return ret;
129 }
130
131 static
132 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
133 uint64_t trace_id)
134 {
135 uint64_t trace_idx;
136 struct lttng_live_trace *ret_trace = NULL;
137
138 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
139 struct lttng_live_trace *trace =
140 g_ptr_array_index(session->traces, trace_idx);
141 if (trace->id == trace_id) {
142 ret_trace = trace;
143 goto end;
144 }
145 }
146
147 end:
148 return ret_trace;
149 }
150
151 static
152 void lttng_live_destroy_trace(struct lttng_live_trace *trace)
153 {
154 bt_logging_level log_level = trace->log_level;
155
156 BT_LOGD("Destroy lttng_live_trace");
157
158 BT_ASSERT(trace->stream_iterators);
159 g_ptr_array_free(trace->stream_iterators, TRUE);
160
161 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
162 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
163
164 lttng_live_metadata_fini(trace);
165 g_free(trace);
166 }
167
168 static
169 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
170 uint64_t trace_id)
171 {
172 struct lttng_live_trace *trace = NULL;
173 bt_logging_level log_level = session->log_level;
174
175 trace = g_new0(struct lttng_live_trace, 1);
176 if (!trace) {
177 goto error;
178 }
179 trace->log_level = session->log_level;
180 trace->session = session;
181 trace->id = trace_id;
182 trace->trace_class = NULL;
183 trace->trace = NULL;
184 trace->stream_iterators = g_ptr_array_new_with_free_func(
185 (GDestroyNotify) lttng_live_stream_iterator_destroy);
186 BT_ASSERT(trace->stream_iterators);
187 trace->new_metadata_needed = true;
188 g_ptr_array_add(session->traces, trace);
189
190 BT_LOGI("Create trace");
191 goto end;
192 error:
193 g_free(trace);
194 trace = NULL;
195 end:
196 return trace;
197 }
198
199 BT_HIDDEN
200 struct lttng_live_trace *lttng_live_borrow_trace(
201 struct lttng_live_session *session, uint64_t trace_id)
202 {
203 struct lttng_live_trace *trace;
204
205 trace = lttng_live_find_trace(session, trace_id);
206 if (trace) {
207 goto end;
208 }
209
210 /* The session is the owner of the newly created trace. */
211 trace = lttng_live_create_trace(session, trace_id);
212
213 end:
214 return trace;
215 }
216
217 BT_HIDDEN
218 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
219 uint64_t session_id, const char *hostname,
220 const char *session_name)
221 {
222 int ret = 0;
223 struct lttng_live_session *session;
224 bt_logging_level log_level = lttng_live_msg_iter->log_level;
225
226 session = g_new0(struct lttng_live_session, 1);
227 if (!session) {
228 goto error;
229 }
230
231 session->log_level = lttng_live_msg_iter->log_level;
232 session->id = session_id;
233 session->traces = g_ptr_array_new_with_free_func(
234 (GDestroyNotify) lttng_live_destroy_trace);
235 BT_ASSERT(session->traces);
236 session->lttng_live_msg_iter = lttng_live_msg_iter;
237 session->new_streams_needed = true;
238 session->hostname = g_string_new(hostname);
239 BT_ASSERT(session->hostname);
240
241 session->session_name = g_string_new(session_name);
242 BT_ASSERT(session->session_name);
243
244 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
245 session->id, hostname, session_name);
246 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
247 goto end;
248 error:
249 BT_LOGE("Error adding session");
250 g_free(session);
251 ret = -1;
252 end:
253 return ret;
254 }
255
256 static
257 void lttng_live_destroy_session(struct lttng_live_session *session)
258 {
259 struct lttng_live_component *live_comp;
260 bt_logging_level log_level;
261
262 if (!session) {
263 goto end;
264 }
265
266 log_level = session->log_level;
267 BT_LOGD("Destroy lttng live session");
268 if (session->id != -1ULL) {
269 if (lttng_live_detach_session(session)) {
270 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
271 if (session->lttng_live_msg_iter &&
272 !lttng_live_graph_is_canceled(live_comp)) {
273 /* Old relayd cannot detach sessions. */
274 BT_LOGD("Unable to detach lttng live session %" PRIu64,
275 session->id);
276 }
277 }
278 session->id = -1ULL;
279 }
280
281 if (session->traces) {
282 g_ptr_array_free(session->traces, TRUE);
283 }
284
285 if (session->hostname) {
286 g_string_free(session->hostname, TRUE);
287 }
288 if (session->session_name) {
289 g_string_free(session->session_name, TRUE);
290 }
291 g_free(session);
292
293 end:
294 return;
295 }
296
297 static
298 void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
299 {
300 if (!lttng_live_msg_iter) {
301 goto end;
302 }
303
304 if (lttng_live_msg_iter->sessions) {
305 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
306 }
307
308 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
309 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
310 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
311
312 /* All stream iterators must be destroyed at this point. */
313 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
314 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
315
316 g_free(lttng_live_msg_iter);
317
318 end:
319 return;
320 }
321
322 BT_HIDDEN
323 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
324 {
325 struct lttng_live_msg_iter *lttng_live_msg_iter;
326
327 BT_ASSERT(self_msg_iter);
328
329 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
330 BT_ASSERT(lttng_live_msg_iter);
331 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
332 }
333
334 static
335 enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
336 struct lttng_live_stream_iterator *lttng_live_stream)
337 {
338 bt_logging_level log_level = lttng_live_stream->log_level;
339
340 switch (lttng_live_stream->state) {
341 case LTTNG_LIVE_STREAM_QUIESCENT:
342 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
343 break;
344 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
345 /* Invalid state. */
346 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
347 abort();
348 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
349 /* Invalid state. */
350 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
351 abort();
352 case LTTNG_LIVE_STREAM_EOF:
353 break;
354 }
355 return LTTNG_LIVE_ITERATOR_STATUS_OK;
356 }
357
358 /*
359 * For active no data stream, fetch next data. It can be either:
360 * - quiescent: need to put it in the prio heap at quiescent end
361 * timestamp,
362 * - have data: need to wire up first event into the prio heap,
363 * - have no data on this stream at this point: need to retry (AGAIN) or
364 * return EOF.
365 */
366 static
367 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
368 struct lttng_live_msg_iter *lttng_live_msg_iter,
369 struct lttng_live_stream_iterator *lttng_live_stream)
370 {
371 bt_logging_level log_level = lttng_live_msg_iter->log_level;
372 enum lttng_live_iterator_status ret =
373 LTTNG_LIVE_ITERATOR_STATUS_OK;
374 struct packet_index index;
375 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
376
377 if (lttng_live_stream->trace->new_metadata_needed) {
378 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
379 goto end;
380 }
381 if (lttng_live_stream->trace->session->new_streams_needed) {
382 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
383 goto end;
384 }
385 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
386 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
387 goto end;
388 }
389 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
390 &index);
391 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
392 goto end;
393 }
394 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
395 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
396 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
397 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
398
399 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
400 last_inact_ts == curr_inact_ts) {
401 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
402 print_stream_state(lttng_live_stream);
403 } else {
404 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
405 }
406 goto end;
407 }
408 lttng_live_stream->base_offset = index.offset;
409 lttng_live_stream->offset = index.offset;
410 lttng_live_stream->len = index.packet_size / CHAR_BIT;
411 end:
412 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
413 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
414 }
415 return ret;
416 }
417
418 /*
419 * Creation of the message requires the ctf trace class to be created
420 * beforehand, but the live protocol gives us all streams (including metadata)
421 * at once. So we split it in three steps: getting streams, getting metadata
422 * (which creates the ctf trace class), and then creating the per-stream
423 * messages.
424 */
425 static
426 enum lttng_live_iterator_status lttng_live_get_session(
427 struct lttng_live_msg_iter *lttng_live_msg_iter,
428 struct lttng_live_session *session)
429 {
430 enum lttng_live_iterator_status status;
431 uint64_t trace_idx;
432 int ret = 0;
433
434 if (!session->attached) {
435 ret = lttng_live_attach_session(session);
436 if (ret) {
437 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
438 lttng_live_msg_iter->lttng_live_comp)) {
439 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 } else {
441 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
442 }
443 goto end;
444 }
445 }
446
447 status = lttng_live_get_new_streams(session);
448 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
449 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
450 goto end;
451 }
452 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
453 struct lttng_live_trace *trace =
454 g_ptr_array_index(session->traces, trace_idx);
455
456 status = lttng_live_metadata_update(trace);
457 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
458 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
459 goto end;
460 }
461 }
462 status = lttng_live_lazy_msg_init(session);
463
464 end:
465 return status;
466 }
467
468 BT_HIDDEN
469 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
470 {
471 uint64_t session_idx;
472
473 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
474 session_idx++) {
475 struct lttng_live_session *session =
476 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
477 session->new_streams_needed = true;
478 }
479 }
480
481 static
482 void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
483 {
484 uint64_t session_idx, trace_idx;
485
486 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
487 session_idx++) {
488 struct lttng_live_session *session =
489 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
490 session->new_streams_needed = true;
491 for (trace_idx = 0; trace_idx < session->traces->len;
492 trace_idx++) {
493 struct lttng_live_trace *trace =
494 g_ptr_array_index(session->traces, trace_idx);
495 trace->new_metadata_needed = true;
496 }
497 }
498 }
499
500 static
501 enum lttng_live_iterator_status
502 lttng_live_iterator_handle_new_streams_and_metadata(
503 struct lttng_live_msg_iter *lttng_live_msg_iter)
504 {
505 enum lttng_live_iterator_status ret =
506 LTTNG_LIVE_ITERATOR_STATUS_OK;
507 uint64_t session_idx = 0, nr_sessions_opened = 0;
508 struct lttng_live_session *session;
509 enum session_not_found_action sess_not_found_act =
510 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
511
512 /*
513 * In a remotely distant future, we could add a "new
514 * session" flag to the protocol, which would tell us that we
515 * need to query for new sessions even though we have sessions
516 * currently ongoing.
517 */
518 if (lttng_live_msg_iter->sessions->len == 0) {
519 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
520 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
521 goto end;
522 } else {
523 /*
524 * Retry to create a viewer session for the requested
525 * session name.
526 */
527 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
528 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
529 goto end;
530 }
531 }
532 }
533
534 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
535 session_idx++) {
536 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
537 session_idx);
538 ret = lttng_live_get_session(lttng_live_msg_iter, session);
539 switch (ret) {
540 case LTTNG_LIVE_ITERATOR_STATUS_OK:
541 break;
542 case LTTNG_LIVE_ITERATOR_STATUS_END:
543 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
544 break;
545 default:
546 goto end;
547 }
548 if (!session->closed) {
549 nr_sessions_opened++;
550 }
551 }
552 end:
553 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
554 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
555 nr_sessions_opened == 0) {
556 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
557 }
558 return ret;
559 }
560
561 static
562 enum lttng_live_iterator_status emit_inactivity_message(
563 struct lttng_live_msg_iter *lttng_live_msg_iter,
564 struct lttng_live_stream_iterator *stream_iter,
565 bt_message **message, uint64_t timestamp)
566 {
567 enum lttng_live_iterator_status ret =
568 LTTNG_LIVE_ITERATOR_STATUS_OK;
569 bt_message *msg = NULL;
570
571 BT_ASSERT(stream_iter->trace->clock_class);
572
573 msg = bt_message_message_iterator_inactivity_create(
574 lttng_live_msg_iter->self_msg_iter,
575 stream_iter->trace->clock_class,
576 timestamp);
577 if (!msg) {
578 goto error;
579 }
580
581 *message = msg;
582 end:
583 return ret;
584
585 error:
586 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
587 bt_message_put_ref(msg);
588 goto end;
589 }
590
591 static
592 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
593 struct lttng_live_msg_iter *lttng_live_msg_iter,
594 struct lttng_live_stream_iterator *lttng_live_stream,
595 bt_message **message)
596 {
597 enum lttng_live_iterator_status ret =
598 LTTNG_LIVE_ITERATOR_STATUS_OK;
599
600 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
601 return LTTNG_LIVE_ITERATOR_STATUS_OK;
602 }
603
604 if (lttng_live_stream->current_inactivity_ts ==
605 lttng_live_stream->last_inactivity_ts) {
606 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
607 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
608 goto end;
609 }
610
611 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
612 message, lttng_live_stream->current_inactivity_ts);
613
614 lttng_live_stream->last_inactivity_ts =
615 lttng_live_stream->current_inactivity_ts;
616 end:
617 return ret;
618 }
619
620 static
621 int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
622 struct lttng_live_msg_iter *lttng_live_msg_iter,
623 const bt_message *msg, int64_t last_msg_ts_ns,
624 int64_t *ts_ns)
625 {
626 const bt_clock_class *clock_class = NULL;
627 const bt_clock_snapshot *clock_snapshot = NULL;
628 int ret = 0;
629 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
630 bt_logging_level log_level = lttng_live_msg_iter->log_level;
631
632 BT_ASSERT(msg);
633 BT_ASSERT(ts_ns);
634
635 BT_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
636 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
637 last_msg_ts_ns);
638
639 switch (bt_message_get_type(msg)) {
640 case BT_MESSAGE_TYPE_EVENT:
641 clock_class =
642 bt_message_event_borrow_stream_class_default_clock_class_const(
643 msg);
644 BT_ASSERT(clock_class);
645
646 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
647 msg);
648 break;
649 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
650 clock_class =
651 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
652 msg);
653 BT_ASSERT(clock_class);
654
655 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
656 msg);
657 break;
658 case BT_MESSAGE_TYPE_PACKET_END:
659 clock_class =
660 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
661 msg);
662 BT_ASSERT(clock_class);
663
664 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
665 msg);
666 break;
667 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
668 clock_class =
669 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
670 msg);
671 BT_ASSERT(clock_class);
672
673 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
674 msg);
675 break;
676 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
677 clock_class =
678 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
679 msg);
680 BT_ASSERT(clock_class);
681
682 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
683 msg);
684 break;
685 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
686 clock_class =
687 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
688 msg);
689 BT_ASSERT(clock_class);
690
691 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
692 msg, &clock_snapshot);
693 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
694 goto no_clock_snapshot;
695 }
696
697 break;
698 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
699 clock_class =
700 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
701 msg);
702 BT_ASSERT(clock_class);
703
704 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
705 msg, &clock_snapshot);
706 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
707 goto no_clock_snapshot;
708 }
709
710 break;
711 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
712 clock_snapshot =
713 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
714 msg);
715 break;
716 default:
717 /* All the other messages have a higher priority */
718 BT_LOGD_STR("Message has no timestamp: using the last message timestamp.");
719 *ts_ns = last_msg_ts_ns;
720 goto end;
721 }
722
723 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
724 BT_ASSERT(clock_class);
725
726 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
727 if (ret) {
728 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
729 "clock-snapshot-addr=%p", clock_snapshot);
730 goto error;
731 }
732
733 goto end;
734
735 no_clock_snapshot:
736 BT_LOGD_STR("Message's default clock snapshot is missing: "
737 "using the last message timestamp.");
738 *ts_ns = last_msg_ts_ns;
739 goto end;
740
741 error:
742 ret = -1;
743
744 end:
745 if (ret == 0) {
746 BT_LOGD("Found message's timestamp: "
747 "iter-data-addr=%p, msg-addr=%p, "
748 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
749 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
750 }
751
752 return ret;
753 }
754
755 static
756 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
757 struct lttng_live_msg_iter *lttng_live_msg_iter,
758 struct lttng_live_stream_iterator *lttng_live_stream,
759 bt_message **message)
760 {
761 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
762 enum bt_msg_iter_status status;
763 uint64_t session_idx, trace_idx;
764 bt_logging_level log_level = lttng_live_msg_iter->log_level;
765
766 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
767 session_idx++) {
768 struct lttng_live_session *session =
769 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
770
771 if (session->new_streams_needed) {
772 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
773 goto end;
774 }
775 for (trace_idx = 0; trace_idx < session->traces->len;
776 trace_idx++) {
777 struct lttng_live_trace *trace =
778 g_ptr_array_index(session->traces, trace_idx);
779 if (trace->new_metadata_needed) {
780 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
781 goto end;
782 }
783 }
784 }
785
786 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
787 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
788 goto end;
789 }
790
791 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
792 lttng_live_msg_iter->self_msg_iter, message);
793 switch (status) {
794 case BT_MSG_ITER_STATUS_EOF:
795 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
796 break;
797 case BT_MSG_ITER_STATUS_OK:
798 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
799 break;
800 case BT_MSG_ITER_STATUS_AGAIN:
801 /*
802 * Continue immediately (end of packet). The next
803 * get_index may return AGAIN to delay the following
804 * attempt.
805 */
806 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
807 break;
808 case BT_MSG_ITER_STATUS_INVAL:
809 /* No argument provided by the user, so don't return INVAL. */
810 case BT_MSG_ITER_STATUS_ERROR:
811 default:
812 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
813 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
814 lttng_live_stream->msg_iter);
815 break;
816 }
817
818 end:
819 return ret;
820 }
821
822 /*
823 * helper function:
824 * handle_no_data_streams()
825 * retry:
826 * - for each ACTIVE_NO_DATA stream:
827 * - query relayd for stream data, or quiescence info.
828 * - if need metadata, get metadata, goto retry.
829 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
830 * - if quiescent, move to QUIESCENT streams
831 * - if fetched data, move to ACTIVE_DATA streams
832 * (at this point each stream either has data, or is quiescent)
833 *
834 *
835 * iterator_next:
836 * handle_new_streams_and_metadata()
837 * - query relayd for known streams, add them as ACTIVE_NO_DATA
838 * - query relayd for metadata
839 *
840 * call handle_active_no_data_streams()
841 *
842 * handle_quiescent_streams()
843 * - if at least one stream is ACTIVE_DATA:
844 * - peek stream event with lowest timestamp -> next_ts
845 * - for each quiescent stream
846 * - if next_ts >= quiescent end
847 * - set state to ACTIVE_NO_DATA
848 * - else
849 * - for each quiescent stream
850 * - set state to ACTIVE_NO_DATA
851 *
852 * call handle_active_no_data_streams()
853 *
854 * handle_active_data_streams()
855 * - if at least one stream is ACTIVE_DATA:
856 * - get stream event with lowest timestamp from heap
857 * - make that stream event the current message.
858 * - move this stream heap position to its next event
859 * - if we need to fetch data from relayd, move
860 * stream to ACTIVE_NO_DATA.
861 * - return OK
862 * - return AGAIN
863 *
864 * end criterion: ctrl-c on client. If relayd exits or the session
865 * closes on the relay daemon side, we keep on waiting for streams.
866 * Eventually handle --end timestamp (also an end criterion).
867 *
868 * When disconnected from relayd: try to re-connect endlessly.
869 */
870 static
871 enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
872 struct lttng_live_msg_iter *lttng_live_msg_iter,
873 struct lttng_live_stream_iterator *stream_iter,
874 bt_message **curr_msg)
875 {
876 bt_logging_level log_level = lttng_live_msg_iter->log_level;
877 enum lttng_live_iterator_status live_status;
878
879 retry:
880 print_stream_state(stream_iter);
881 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
882 lttng_live_msg_iter);
883 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
884 goto end;
885 }
886 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
887 lttng_live_msg_iter, stream_iter);
888 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
889 goto end;
890 }
891 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
892 lttng_live_msg_iter, stream_iter, curr_msg);
893 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
894 BT_ASSERT(*curr_msg == NULL);
895 goto end;
896 }
897 if (*curr_msg) {
898 goto end;
899 }
900 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
901 lttng_live_msg_iter, stream_iter, curr_msg);
902 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
903 BT_ASSERT(*curr_msg == NULL);
904 }
905
906 end:
907 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
908 goto retry;
909 }
910
911 return live_status;
912 }
913
914 static
915 enum lttng_live_iterator_status next_stream_iterator_for_trace(
916 struct lttng_live_msg_iter *lttng_live_msg_iter,
917 struct lttng_live_trace *live_trace,
918 struct lttng_live_stream_iterator **candidate_stream_iter)
919 {
920 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
921 enum lttng_live_iterator_status stream_iter_status;;
922 int64_t curr_candidate_msg_ts = INT64_MAX;
923 uint64_t stream_iter_idx;
924 bt_logging_level log_level = lttng_live_msg_iter->log_level;
925
926 BT_ASSERT(live_trace);
927 BT_ASSERT(live_trace->stream_iterators);
928 /*
929 * Update the current message of every stream iterators of this trace.
930 * The current msg of every stream must have a timestamp equal or
931 * larger than the last message returned by this iterator. We must
932 * ensure monotonicity.
933 */
934 stream_iter_idx = 0;
935 while (stream_iter_idx < live_trace->stream_iterators->len) {
936 bool stream_iter_is_ended = false;
937 struct lttng_live_stream_iterator *stream_iter =
938 g_ptr_array_index(live_trace->stream_iterators,
939 stream_iter_idx);
940
941 /*
942 * Find if there is are now current message for this stream
943 * iterator get it.
944 */
945 while (!stream_iter->current_msg) {
946 bt_message *msg = NULL;
947 int64_t curr_msg_ts_ns = INT64_MAX;
948 stream_iter_status = lttng_live_iterator_next_on_stream(
949 lttng_live_msg_iter, stream_iter, &msg);
950
951 BT_LOGD("live stream iterator returned status :%s",
952 print_live_iterator_status(stream_iter_status));
953 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
954 stream_iter_is_ended = true;
955 break;
956 }
957
958 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
959 goto end;
960 }
961
962 BT_ASSERT(msg);
963
964 /*
965 * Get the timestamp in nanoseconds from origin of this
966 * messsage.
967 */
968 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
969 msg, lttng_live_msg_iter->last_msg_ts_ns,
970 &curr_msg_ts_ns);
971
972 /*
973 * Check if the message of the current live stream
974 * iterator occured at the exact same time or after the
975 * last message returned by this component's message
976 * iterator. If not, we return an error.
977 */
978 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
979 stream_iter->current_msg = msg;
980 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
981 } else {
982 /*
983 * We received a message in the past. To ensure
984 * monotonicity, we can't send it forward.
985 */
986 BT_LOGE("Message's timestamp is less than "
987 "lttng-live's message iterator's last "
988 "returned timestamp: "
989 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
990 "last-msg-ts=%" PRId64,
991 lttng_live_msg_iter, curr_msg_ts_ns,
992 lttng_live_msg_iter->last_msg_ts_ns);
993 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
994 goto end;
995 }
996 }
997
998 if (!stream_iter_is_ended &&
999 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1000 /*
1001 * Update the current best candidate message for the
1002 * stream iterator of thise live trace to be forwarded
1003 * downstream.
1004 */
1005 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1006 curr_candidate_stream_iter = stream_iter;
1007 }
1008
1009 if (stream_iter_is_ended) {
1010 /*
1011 * The live stream iterator is ENDed. We remove that
1012 * iterator from the list and we restart the iteration
1013 * at the beginning of the live stream iterator array
1014 * to because the removal will shuffle the array.
1015 */
1016 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1017 stream_iter_idx);
1018 stream_iter_idx = 0;
1019 } else {
1020 stream_iter_idx++;
1021 }
1022 }
1023
1024 if (curr_candidate_stream_iter) {
1025 *candidate_stream_iter = curr_candidate_stream_iter;
1026 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1027 } else {
1028 /*
1029 * The only case where we don't have a candidate for this trace
1030 * is if we reached the end of all the iterators.
1031 */
1032 BT_ASSERT(live_trace->stream_iterators->len == 0);
1033 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1034 }
1035
1036 end:
1037 return stream_iter_status;
1038 }
1039
1040 static
1041 enum lttng_live_iterator_status next_stream_iterator_for_session(
1042 struct lttng_live_msg_iter *lttng_live_msg_iter,
1043 struct lttng_live_session *session,
1044 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1045 {
1046 enum lttng_live_iterator_status stream_iter_status;
1047 uint64_t trace_idx = 0;
1048 int64_t curr_candidate_msg_ts = INT64_MAX;
1049 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1050
1051 /*
1052 * Make sure we are attached to the session and look for new streams
1053 * and metadata.
1054 */
1055 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1056 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1057 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1058 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1059 goto end;
1060 }
1061
1062 BT_ASSERT(session->traces);
1063
1064 /*
1065 * Use while loops here rather then for loops so we can restart the
1066 * iteration if an element is removed from the array during the
1067 * looping.
1068 */
1069 while (trace_idx < session->traces->len) {
1070 bool trace_is_ended = false;
1071 struct lttng_live_stream_iterator *stream_iter;
1072 struct lttng_live_trace *trace =
1073 g_ptr_array_index(session->traces, trace_idx);
1074
1075 stream_iter_status = next_stream_iterator_for_trace(
1076 lttng_live_msg_iter, trace, &stream_iter);
1077 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1078 /*
1079 * All the live stream iterators for this trace are
1080 * ENDed. Remove the trace from this session.
1081 */
1082 trace_is_ended = true;
1083 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1084 goto end;
1085 }
1086
1087 if (!trace_is_ended) {
1088 BT_ASSERT(stream_iter);
1089
1090 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1091 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1092 curr_candidate_stream_iter = stream_iter;
1093 }
1094 trace_idx++;
1095 } else {
1096 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1097 trace_idx = 0;
1098 }
1099 }
1100 if (curr_candidate_stream_iter) {
1101 *candidate_session_stream_iter = curr_candidate_stream_iter;
1102 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1103 } else {
1104 /*
1105 * The only cases where we don't have a candidate for this
1106 * trace is:
1107 * 1. if we reached the end of all the iterators of all the
1108 * traces of this session,
1109 * 2. if we never had live stream iterator in the first place.
1110 *
1111 * In either cases, we return END.
1112 */
1113 BT_ASSERT(session->traces->len == 0);
1114 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1115 }
1116 end:
1117 return stream_iter_status;
1118 }
1119
1120 static inline
1121 void put_messages(bt_message_array_const msgs, uint64_t count)
1122 {
1123 uint64_t i;
1124
1125 for (i = 0; i < count; i++) {
1126 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1127 }
1128 }
1129
1130 BT_HIDDEN
1131 bt_self_message_iterator_status lttng_live_msg_iter_next(
1132 bt_self_message_iterator *self_msg_it,
1133 bt_message_array_const msgs, uint64_t capacity,
1134 uint64_t *count)
1135 {
1136 bt_self_message_iterator_status status;
1137 struct lttng_live_msg_iter *lttng_live_msg_iter =
1138 bt_self_message_iterator_get_data(self_msg_it);
1139 struct lttng_live_component *lttng_live =
1140 lttng_live_msg_iter->lttng_live_comp;
1141 enum lttng_live_iterator_status stream_iter_status;
1142 uint64_t session_idx;
1143
1144 *count = 0;
1145
1146 BT_ASSERT(lttng_live_msg_iter);
1147
1148 /*
1149 * Clear all the invalid message reference that might be left over in
1150 * the output array.
1151 */
1152 memset(msgs, 0, capacity * sizeof(*msgs));
1153
1154 /*
1155 * If no session are exposed on the relay found at the url provided by
1156 * the user, session count will be 0. In this case, we return status
1157 * end to return gracefully.
1158 */
1159 if (lttng_live_msg_iter->sessions->len == 0) {
1160 if (lttng_live->params.sess_not_found_act !=
1161 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1162 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1163 goto no_session;
1164 } else {
1165 /*
1166 * The are no more active session for this session
1167 * name. Retry to create a viewer session for the
1168 * requested session name.
1169 */
1170 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1171 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1172 goto no_session;
1173 }
1174 }
1175 }
1176
1177 if (lttng_live_msg_iter->active_stream_iter == 0) {
1178 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1179 }
1180
1181 /*
1182 * Here the muxing of message is done.
1183 *
1184 * We need to iterate over all the streams of all the traces of all the
1185 * viewer sessions in order to get the message with the smallest
1186 * timestamp. In this case, a session is a viewer session and there is
1187 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1188 * kernel). Each viewer session can have multiple traces, for example,
1189 * 64bit UST viewer sessions could have multiple per-pid traces.
1190 *
1191 * We iterate over the streams of each traces to update and see what is
1192 * their next message's timestamp. From those timestamps, we select the
1193 * message with the smallest timestamp as the best candidate message
1194 * for that trace and do the same thing across all the sessions.
1195 *
1196 * We then compare the timestamp of best candidate message of all the
1197 * sessions to pick the message with the smallest timestamp and we
1198 * return it.
1199 */
1200 while (*count < capacity) {
1201 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1202 *candidate_stream_iter = NULL;
1203 int64_t next_msg_ts_ns = INT64_MAX;
1204
1205 BT_ASSERT(lttng_live_msg_iter->sessions);
1206 session_idx = 0;
1207 /*
1208 * Use a while loop instead of a for loop so we can restart the
1209 * iteration if we remove an element. We can safely call
1210 * next_stream_iterator_for_session() multiple times on the
1211 * same session as we only fetch a new message if there is no
1212 * current next message for each live stream iterator.
1213 * If all live stream iterator of that session already have a
1214 * current next message, the function will simply exit return
1215 * the same candidate live stream iterator every time.
1216 */
1217 while (session_idx < lttng_live_msg_iter->sessions->len) {
1218 struct lttng_live_session *session =
1219 g_ptr_array_index(lttng_live_msg_iter->sessions,
1220 session_idx);
1221
1222 /* Find the best candidate message to send downstream. */
1223 stream_iter_status = next_stream_iterator_for_session(
1224 lttng_live_msg_iter, session,
1225 &candidate_stream_iter);
1226
1227 /* If we receive an END status, it means that either:
1228 * - Those traces never had active streams (UST with no
1229 * data produced yet),
1230 * - All live stream iterators have ENDed.*/
1231 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1232 if (session->closed && session->traces->len == 0) {
1233 /*
1234 * Remove the session from the list and restart the
1235 * iteration at the beginning of the array since the
1236 * removal shuffle the elements of the array.
1237 */
1238 g_ptr_array_remove_index_fast(
1239 lttng_live_msg_iter->sessions,
1240 session_idx);
1241 session_idx = 0;
1242 } else {
1243 session_idx++;
1244 }
1245 continue;
1246 }
1247
1248 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1249 goto end;
1250 }
1251
1252 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1253 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1254 next_stream_iter = candidate_stream_iter;
1255 }
1256
1257 session_idx++;
1258 }
1259
1260 if (!next_stream_iter) {
1261 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1262 goto end;
1263 }
1264
1265 BT_ASSERT(next_stream_iter->current_msg);
1266 /* Ensure monotonicity. */
1267 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1268 next_stream_iter->current_msg_ts_ns);
1269
1270 /*
1271 * Insert the next message to the message batch. This will set
1272 * stream iterator current messsage to NULL so that next time
1273 * we fetch the next message of that stream iterator
1274 */
1275 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1276 (*count)++;
1277
1278 /* Update the last timestamp in nanoseconds sent downstream. */
1279 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1280 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1281
1282 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1283 }
1284 end:
1285 switch (stream_iter_status) {
1286 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1287 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1288 if (*count > 0) {
1289 /*
1290 * We received a again status but we have some messages
1291 * to send downstream. We send them and return OK for
1292 * now. On the next call we return again if there are
1293 * still no new message to send.
1294 */
1295 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1296 } else {
1297 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1298 }
1299 break;
1300 case LTTNG_LIVE_ITERATOR_STATUS_END:
1301 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1302 break;
1303 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1304 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1305 break;
1306 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1307 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1308 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1309 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1310 /* Put all existing messages on error. */
1311 put_messages(msgs, *count);
1312 break;
1313 default:
1314 abort();
1315 }
1316
1317 no_session:
1318 return status;
1319 }
1320
1321 BT_HIDDEN
1322 bt_self_message_iterator_status lttng_live_msg_iter_init(
1323 bt_self_message_iterator *self_msg_it,
1324 bt_self_component_source *self_comp_src,
1325 bt_self_component_port_output *self_port)
1326 {
1327 bt_self_message_iterator_status ret =
1328 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1329 bt_self_component *self_comp =
1330 bt_self_component_source_as_self_component(self_comp_src);
1331 struct lttng_live_component *lttng_live;
1332 struct lttng_live_msg_iter *lttng_live_msg_iter;
1333 bt_logging_level log_level;
1334
1335 BT_ASSERT(self_msg_it);
1336
1337 lttng_live = bt_self_component_get_data(self_comp);
1338 log_level = lttng_live->log_level;
1339
1340 /* There can be only one downstream iterator at the same time. */
1341 BT_ASSERT(!lttng_live->has_msg_iter);
1342 lttng_live->has_msg_iter = true;
1343
1344 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1345 if (!lttng_live_msg_iter) {
1346 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1347 goto end;
1348 }
1349
1350 lttng_live_msg_iter->log_level = lttng_live->log_level;
1351 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1352 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1353
1354 lttng_live_msg_iter->active_stream_iter = 0;
1355 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1356 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1357 (GDestroyNotify) lttng_live_destroy_session);
1358 BT_ASSERT(lttng_live_msg_iter->sessions);
1359
1360 lttng_live_msg_iter->viewer_connection =
1361 live_viewer_connection_create(lttng_live->params.url->str, false,
1362 lttng_live_msg_iter);
1363 if (!lttng_live_msg_iter->viewer_connection) {
1364 goto error;
1365 }
1366
1367 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1368 goto error;
1369 }
1370 if (lttng_live_msg_iter->sessions->len == 0) {
1371 switch (lttng_live->params.sess_not_found_act) {
1372 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1373 BT_LOGI("Unable to connect to the requested live viewer "
1374 "session. Keep trying to connect because of "
1375 "%s=\"%s\" component parameter: url=\"%s\"",
1376 SESS_NOT_FOUND_ACTION_PARAM,
1377 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1378 lttng_live->params.url->str);
1379 break;
1380 case SESSION_NOT_FOUND_ACTION_FAIL:
1381 BT_LOGE("Unable to connect to the requested live viewer "
1382 "session. Fail the message iterator"
1383 "initialization because of %s=\"%s\" "
1384 "component parameter: url =\"%s\"",
1385 SESS_NOT_FOUND_ACTION_PARAM,
1386 SESS_NOT_FOUND_ACTION_FAIL_STR,
1387 lttng_live->params.url->str);
1388 goto error;
1389 case SESSION_NOT_FOUND_ACTION_END:
1390 BT_LOGI("Unable to connect to the requested live viewer "
1391 "session. End gracefully at the first _next() "
1392 "call because of %s=\"%s\" component parameter: "
1393 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1394 SESS_NOT_FOUND_ACTION_END_STR,
1395 lttng_live->params.url->str);
1396 break;
1397 }
1398 }
1399
1400 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1401
1402 goto end;
1403 error:
1404 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1405 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1406 end:
1407 return ret;
1408 }
1409
1410 static
1411 bt_query_status lttng_live_query_list_sessions(const bt_value *params,
1412 const bt_value **result, bt_logging_level log_level)
1413 {
1414 bt_query_status status = BT_QUERY_STATUS_OK;
1415 const bt_value *url_value = NULL;
1416 const char *url;
1417 struct live_viewer_connection *viewer_connection = NULL;
1418
1419 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1420 if (!url_value) {
1421 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1422 status = BT_QUERY_STATUS_INVALID_PARAMS;
1423 goto error;
1424 }
1425
1426 if (!bt_value_is_string(url_value)) {
1427 BT_LOGW("`%s` parameter is required to be a string value",
1428 URL_PARAM);
1429 status = BT_QUERY_STATUS_INVALID_PARAMS;
1430 goto error;
1431 }
1432
1433 url = bt_value_string_get(url_value);
1434
1435 viewer_connection = live_viewer_connection_create(url, true, NULL);
1436 if (!viewer_connection) {
1437 goto error;
1438 }
1439
1440 status = live_viewer_connection_list_sessions(viewer_connection,
1441 result);
1442 if (status != BT_QUERY_STATUS_OK) {
1443 goto error;
1444 }
1445
1446 goto end;
1447
1448 error:
1449 BT_VALUE_PUT_REF_AND_RESET(*result);
1450
1451 if (status >= 0) {
1452 status = BT_QUERY_STATUS_ERROR;
1453 }
1454
1455 end:
1456 if (viewer_connection) {
1457 live_viewer_connection_destroy(viewer_connection);
1458 }
1459 return status;
1460 }
1461
1462 BT_HIDDEN
1463 bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
1464 const bt_query_executor *query_exec,
1465 const char *object, const bt_value *params,
1466 bt_logging_level log_level, const bt_value **result)
1467 {
1468 bt_query_status status = BT_QUERY_STATUS_OK;
1469
1470 if (strcmp(object, "sessions") == 0) {
1471 status = lttng_live_query_list_sessions(params, result,
1472 log_level);
1473 } else {
1474 BT_LOGW("Unknown query object `%s`", object);
1475 status = BT_QUERY_STATUS_INVALID_OBJECT;
1476 goto end;
1477 }
1478
1479 end:
1480 return status;
1481 }
1482
1483 static
1484 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1485 {
1486 if (!lttng_live) {
1487 return;
1488 }
1489 if (lttng_live->params.url) {
1490 g_string_free(lttng_live->params.url, TRUE);
1491 }
1492 g_free(lttng_live);
1493 }
1494
1495 BT_HIDDEN
1496 void lttng_live_component_finalize(bt_self_component_source *component)
1497 {
1498 void *data = bt_self_component_get_data(
1499 bt_self_component_source_as_self_component(component));
1500
1501 if (!data) {
1502 return;
1503 }
1504 lttng_live_component_destroy_data(data);
1505 }
1506
1507 static
1508 enum session_not_found_action parse_session_not_found_action_param(
1509 const bt_value *no_session_param)
1510 {
1511 enum session_not_found_action action;
1512 const char *no_session_act_str;
1513 no_session_act_str = bt_value_string_get(no_session_param);
1514 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1515 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1516 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1517 action = SESSION_NOT_FOUND_ACTION_FAIL;
1518 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1519 action = SESSION_NOT_FOUND_ACTION_END;
1520 } else {
1521 action = -1;
1522 }
1523
1524 return action;
1525 }
1526
1527 struct lttng_live_component *lttng_live_component_create(const bt_value *params,
1528 bt_logging_level log_level)
1529 {
1530 struct lttng_live_component *lttng_live;
1531 const bt_value *value = NULL;
1532 const char *url;
1533
1534 lttng_live = g_new0(struct lttng_live_component, 1);
1535 if (!lttng_live) {
1536 goto end;
1537 }
1538 lttng_live->log_level = log_level;
1539 lttng_live->max_query_size = MAX_QUERY_SIZE;
1540 lttng_live->has_msg_iter = false;
1541
1542 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1543 if (!value || !bt_value_is_string(value)) {
1544 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1545 URL_PARAM);
1546 goto error;
1547 }
1548 url = bt_value_string_get(value);
1549 lttng_live->params.url = g_string_new(url);
1550 if (!lttng_live->params.url) {
1551 goto error;
1552 }
1553
1554 value = bt_value_map_borrow_entry_value_const(params,
1555 SESS_NOT_FOUND_ACTION_PARAM);
1556
1557 if (value && bt_value_is_string(value)) {
1558 lttng_live->params.sess_not_found_act =
1559 parse_session_not_found_action_param(value);
1560 if (lttng_live->params.sess_not_found_act == -1) {
1561 BT_LOGE("Unexpected value for `%s` parameter: "
1562 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1563 bt_value_string_get(value));
1564 goto error;
1565 }
1566 } else {
1567 BT_LOGW("Optional `%s` parameter is missing or "
1568 "not a string value. Defaulting to %s=\"%s\".",
1569 SESS_NOT_FOUND_ACTION_PARAM,
1570 SESS_NOT_FOUND_ACTION_PARAM,
1571 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1572 lttng_live->params.sess_not_found_act =
1573 SESSION_NOT_FOUND_ACTION_CONTINUE;
1574 }
1575
1576 goto end;
1577
1578 error:
1579 lttng_live_component_destroy_data(lttng_live);
1580 lttng_live = NULL;
1581 end:
1582 return lttng_live;
1583 }
1584
1585 BT_HIDDEN
1586 bt_self_component_status lttng_live_component_init(
1587 bt_self_component_source *self_comp_src,
1588 const bt_value *params, __attribute__((unused)) void *init_method_data)
1589 {
1590 struct lttng_live_component *lttng_live;
1591 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
1592 bt_self_component *self_comp =
1593 bt_self_component_source_as_self_component(self_comp_src);
1594 bt_logging_level log_level = bt_component_get_logging_level(
1595 bt_self_component_as_component(self_comp));
1596
1597 lttng_live = lttng_live_component_create(params, log_level);
1598 if (!lttng_live) {
1599 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
1600 goto error;
1601 }
1602 lttng_live->self_comp = self_comp_src;
1603
1604 if (lttng_live_graph_is_canceled(lttng_live)) {
1605 ret = BT_SELF_COMPONENT_STATUS_END;
1606 goto error;
1607 }
1608
1609 ret = bt_self_component_source_add_output_port(
1610 lttng_live->self_comp, "out",
1611 NULL, NULL);
1612 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
1613 goto error;
1614 }
1615
1616 bt_self_component_set_data(self_comp, lttng_live);
1617 goto end;
1618
1619 error:
1620 lttng_live_component_destroy_data(lttng_live);
1621 lttng_live = NULL;
1622 end:
1623 return ret;
1624 }
This page took 0.100886 seconds and 5 git commands to generate.