Re-organize sources
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
32 #include "logging.h"
33
34 #include <glib.h>
35 #include <inttypes.h>
36 #include <unistd.h>
37
38 #include "common/assert.h"
39 #include <babeltrace2/babeltrace.h>
40 #include "compat/compiler.h"
41 #include <babeltrace2/types.h>
42 #include "plugins/plugins-common.h"
43
44 #include "data-stream.h"
45 #include "metadata.h"
46 #include "lttng-live.h"
47
48 #define MAX_QUERY_SIZE (256*1024)
49 #define URL_PARAM "url"
50 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
51 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
52 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
53 #define SESS_NOT_FOUND_ACTION_END_STR "end"
54
55 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
56
57 static
58 const char *print_live_iterator_status(enum lttng_live_iterator_status status)
59 {
60 switch (status) {
61 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
62 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
63 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
64 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
65 case LTTNG_LIVE_ITERATOR_STATUS_END:
66 return "LTTNG_LIVE_ITERATOR_STATUS_END";
67 case LTTNG_LIVE_ITERATOR_STATUS_OK:
68 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
69 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
70 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
71 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
72 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
73 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
74 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
75 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
76 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
77 default:
78 abort();
79 }
80 }
81
82 static
83 const char *print_state(struct lttng_live_stream_iterator *s)
84 {
85 switch (s->state) {
86 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
87 return "ACTIVE_NO_DATA";
88 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
89 return "QUIESCENT_NO_DATA";
90 case LTTNG_LIVE_STREAM_QUIESCENT:
91 return "QUIESCENT";
92 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
93 return "ACTIVE_DATA";
94 case LTTNG_LIVE_STREAM_EOF:
95 return "EOF";
96 default:
97 return "ERROR";
98 }
99 }
100
101 #define print_stream_state(live_stream_iter) \
102 do { \
103 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
104 ", curr_inact_ts %" PRId64, \
105 print_state(live_stream_iter), \
106 live_stream_iter->last_inactivity_ts, \
107 live_stream_iter->current_inactivity_ts); \
108 } while (0);
109
110 BT_HIDDEN
111 bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
112 {
113 const bt_component *component;
114 bool ret;
115
116 if (!lttng_live) {
117 ret = false;
118 goto end;
119 }
120
121 component = bt_component_source_as_component_const(
122 bt_self_component_source_as_component_source(
123 lttng_live->self_comp));
124
125 ret = bt_component_graph_is_canceled(component);
126
127 end:
128 return ret;
129 }
130
131 static
132 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
133 uint64_t trace_id)
134 {
135 uint64_t trace_idx;
136 struct lttng_live_trace *ret_trace = NULL;
137
138 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
139 struct lttng_live_trace *trace =
140 g_ptr_array_index(session->traces, trace_idx);
141 if (trace->id == trace_id) {
142 ret_trace = trace;
143 goto end;
144 }
145 }
146
147 end:
148 return ret_trace;
149 }
150
151 static
152 void lttng_live_destroy_trace(struct lttng_live_trace *trace)
153 {
154 BT_LOGD("Destroy lttng_live_trace");
155
156 BT_ASSERT(trace->stream_iterators);
157 g_ptr_array_free(trace->stream_iterators, TRUE);
158
159 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
160 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
161
162 lttng_live_metadata_fini(trace);
163 g_free(trace);
164 }
165
166 static
167 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
168 uint64_t trace_id)
169 {
170 struct lttng_live_trace *trace = NULL;
171
172 trace = g_new0(struct lttng_live_trace, 1);
173 if (!trace) {
174 goto error;
175 }
176 trace->session = session;
177 trace->id = trace_id;
178 trace->trace_class = NULL;
179 trace->trace = NULL;
180 trace->stream_iterators = g_ptr_array_new_with_free_func(
181 (GDestroyNotify) lttng_live_stream_iterator_destroy);
182 BT_ASSERT(trace->stream_iterators);
183 trace->new_metadata_needed = true;
184 g_ptr_array_add(session->traces, trace);
185
186 BT_LOGI("Create trace");
187 goto end;
188 error:
189 g_free(trace);
190 trace = NULL;
191 end:
192 return trace;
193 }
194
195 BT_HIDDEN
196 struct lttng_live_trace *lttng_live_borrow_trace(
197 struct lttng_live_session *session, uint64_t trace_id)
198 {
199 struct lttng_live_trace *trace;
200
201 trace = lttng_live_find_trace(session, trace_id);
202 if (trace) {
203 goto end;
204 }
205
206 /* The session is the owner of the newly created trace. */
207 trace = lttng_live_create_trace(session, trace_id);
208
209 end:
210 return trace;
211 }
212
213 BT_HIDDEN
214 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
215 uint64_t session_id, const char *hostname,
216 const char *session_name)
217 {
218 int ret = 0;
219 struct lttng_live_session *session;
220
221 session = g_new0(struct lttng_live_session, 1);
222 if (!session) {
223 goto error;
224 }
225
226 session->id = session_id;
227 session->traces = g_ptr_array_new_with_free_func(
228 (GDestroyNotify) lttng_live_destroy_trace);
229 BT_ASSERT(session->traces);
230 session->lttng_live_msg_iter = lttng_live_msg_iter;
231 session->new_streams_needed = true;
232 session->hostname = g_string_new(hostname);
233 BT_ASSERT(session->hostname);
234
235 session->session_name = g_string_new(session_name);
236 BT_ASSERT(session->session_name);
237
238 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
239 session->id, hostname, session_name);
240 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
241 goto end;
242 error:
243 BT_LOGE("Error adding session");
244 g_free(session);
245 ret = -1;
246 end:
247 return ret;
248 }
249
250 static
251 void lttng_live_destroy_session(struct lttng_live_session *session)
252 {
253 struct lttng_live_component *live_comp;
254
255 if (!session) {
256 goto end;
257 }
258
259 BT_LOGD("Destroy lttng live session");
260 if (session->id != -1ULL) {
261 if (lttng_live_detach_session(session)) {
262 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
263 if (session->lttng_live_msg_iter &&
264 !lttng_live_graph_is_canceled(live_comp)) {
265 /* Old relayd cannot detach sessions. */
266 BT_LOGD("Unable to detach lttng live session %" PRIu64,
267 session->id);
268 }
269 }
270 session->id = -1ULL;
271 }
272
273 if (session->traces) {
274 g_ptr_array_free(session->traces, TRUE);
275 }
276
277 if (session->hostname) {
278 g_string_free(session->hostname, TRUE);
279 }
280 if (session->session_name) {
281 g_string_free(session->session_name, TRUE);
282 }
283 g_free(session);
284
285 end:
286 return;
287 }
288
289 static
290 void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
291 {
292 if (!lttng_live_msg_iter) {
293 goto end;
294 }
295
296 if (lttng_live_msg_iter->sessions) {
297 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
298 }
299
300 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
301 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
302 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
303
304 /* All stream iterators must be destroyed at this point. */
305 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
306 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
307
308 g_free(lttng_live_msg_iter);
309
310 end:
311 return;
312 }
313
314 BT_HIDDEN
315 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
316 {
317 struct lttng_live_msg_iter *lttng_live_msg_iter;
318
319 BT_ASSERT(self_msg_iter);
320
321 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
322 BT_ASSERT(lttng_live_msg_iter);
323 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
324 }
325
326 static
327 enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
328 struct lttng_live_stream_iterator *lttng_live_stream)
329 {
330 switch (lttng_live_stream->state) {
331 case LTTNG_LIVE_STREAM_QUIESCENT:
332 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
333 break;
334 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
335 /* Invalid state. */
336 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
337 abort();
338 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
339 /* Invalid state. */
340 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
341 abort();
342 case LTTNG_LIVE_STREAM_EOF:
343 break;
344 }
345 return LTTNG_LIVE_ITERATOR_STATUS_OK;
346 }
347
348 /*
349 * For active no data stream, fetch next data. It can be either:
350 * - quiescent: need to put it in the prio heap at quiescent end
351 * timestamp,
352 * - have data: need to wire up first event into the prio heap,
353 * - have no data on this stream at this point: need to retry (AGAIN) or
354 * return EOF.
355 */
356 static
357 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
358 struct lttng_live_msg_iter *lttng_live_msg_iter,
359 struct lttng_live_stream_iterator *lttng_live_stream)
360 {
361 enum lttng_live_iterator_status ret =
362 LTTNG_LIVE_ITERATOR_STATUS_OK;
363 struct packet_index index;
364 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
365
366 if (lttng_live_stream->trace->new_metadata_needed) {
367 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
368 goto end;
369 }
370 if (lttng_live_stream->trace->session->new_streams_needed) {
371 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
372 goto end;
373 }
374 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
375 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
376 goto end;
377 }
378 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
379 &index);
380 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
381 goto end;
382 }
383 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
384 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
385 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
386 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
387
388 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
389 last_inact_ts == curr_inact_ts) {
390 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
391 print_stream_state(lttng_live_stream);
392 } else {
393 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
394 }
395 goto end;
396 }
397 lttng_live_stream->base_offset = index.offset;
398 lttng_live_stream->offset = index.offset;
399 lttng_live_stream->len = index.packet_size / CHAR_BIT;
400 end:
401 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
402 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
403 }
404 return ret;
405 }
406
407 /*
408 * Creation of the message requires the ctf trace class to be created
409 * beforehand, but the live protocol gives us all streams (including metadata)
410 * at once. So we split it in three steps: getting streams, getting metadata
411 * (which creates the ctf trace class), and then creating the per-stream
412 * messages.
413 */
414 static
415 enum lttng_live_iterator_status lttng_live_get_session(
416 struct lttng_live_msg_iter *lttng_live_msg_iter,
417 struct lttng_live_session *session)
418 {
419 enum lttng_live_iterator_status status;
420 uint64_t trace_idx;
421 int ret = 0;
422
423 if (!session->attached) {
424 ret = lttng_live_attach_session(session);
425 if (ret) {
426 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
427 lttng_live_msg_iter->lttng_live_comp)) {
428 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
429 } else {
430 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
431 }
432 goto end;
433 }
434 }
435
436 status = lttng_live_get_new_streams(session);
437 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
438 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
439 goto end;
440 }
441 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
442 struct lttng_live_trace *trace =
443 g_ptr_array_index(session->traces, trace_idx);
444
445 status = lttng_live_metadata_update(trace);
446 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
447 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
448 goto end;
449 }
450 }
451 status = lttng_live_lazy_msg_init(session);
452
453 end:
454 return status;
455 }
456
457 BT_HIDDEN
458 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
459 {
460 uint64_t session_idx;
461
462 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
463 session_idx++) {
464 struct lttng_live_session *session =
465 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
466 session->new_streams_needed = true;
467 }
468 }
469
470 static
471 void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
472 {
473 uint64_t session_idx, trace_idx;
474
475 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
476 session_idx++) {
477 struct lttng_live_session *session =
478 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
479 session->new_streams_needed = true;
480 for (trace_idx = 0; trace_idx < session->traces->len;
481 trace_idx++) {
482 struct lttng_live_trace *trace =
483 g_ptr_array_index(session->traces, trace_idx);
484 trace->new_metadata_needed = true;
485 }
486 }
487 }
488
489 static
490 enum lttng_live_iterator_status
491 lttng_live_iterator_handle_new_streams_and_metadata(
492 struct lttng_live_msg_iter *lttng_live_msg_iter)
493 {
494 enum lttng_live_iterator_status ret =
495 LTTNG_LIVE_ITERATOR_STATUS_OK;
496 uint64_t session_idx = 0, nr_sessions_opened = 0;
497 struct lttng_live_session *session;
498 enum session_not_found_action sess_not_found_act =
499 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
500
501 /*
502 * In a remotely distant future, we could add a "new
503 * session" flag to the protocol, which would tell us that we
504 * need to query for new sessions even though we have sessions
505 * currently ongoing.
506 */
507 if (lttng_live_msg_iter->sessions->len == 0) {
508 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
509 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
510 goto end;
511 } else {
512 /*
513 * Retry to create a viewer session for the requested
514 * session name.
515 */
516 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
517 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
518 goto end;
519 }
520 }
521 }
522
523 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
524 session_idx++) {
525 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
526 session_idx);
527 ret = lttng_live_get_session(lttng_live_msg_iter, session);
528 switch (ret) {
529 case LTTNG_LIVE_ITERATOR_STATUS_OK:
530 break;
531 case LTTNG_LIVE_ITERATOR_STATUS_END:
532 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
533 break;
534 default:
535 goto end;
536 }
537 if (!session->closed) {
538 nr_sessions_opened++;
539 }
540 }
541 end:
542 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
543 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
544 nr_sessions_opened == 0) {
545 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
546 }
547 return ret;
548 }
549
550 static
551 enum lttng_live_iterator_status emit_inactivity_message(
552 struct lttng_live_msg_iter *lttng_live_msg_iter,
553 struct lttng_live_stream_iterator *stream_iter,
554 bt_message **message, uint64_t timestamp)
555 {
556 enum lttng_live_iterator_status ret =
557 LTTNG_LIVE_ITERATOR_STATUS_OK;
558 bt_message *msg = NULL;
559
560 BT_ASSERT(stream_iter->trace->clock_class);
561
562 msg = bt_message_message_iterator_inactivity_create(
563 lttng_live_msg_iter->self_msg_iter,
564 stream_iter->trace->clock_class,
565 timestamp);
566 if (!msg) {
567 goto error;
568 }
569
570 *message = msg;
571 end:
572 return ret;
573
574 error:
575 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
576 bt_message_put_ref(msg);
577 goto end;
578 }
579
580 static
581 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
582 struct lttng_live_msg_iter *lttng_live_msg_iter,
583 struct lttng_live_stream_iterator *lttng_live_stream,
584 bt_message **message)
585 {
586 enum lttng_live_iterator_status ret =
587 LTTNG_LIVE_ITERATOR_STATUS_OK;
588
589 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
590 return LTTNG_LIVE_ITERATOR_STATUS_OK;
591 }
592
593 if (lttng_live_stream->current_inactivity_ts ==
594 lttng_live_stream->last_inactivity_ts) {
595 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
596 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
597 goto end;
598 }
599
600 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
601 message, lttng_live_stream->current_inactivity_ts);
602
603 lttng_live_stream->last_inactivity_ts =
604 lttng_live_stream->current_inactivity_ts;
605 end:
606 return ret;
607 }
608
609 static
610 int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
611 struct lttng_live_msg_iter *lttng_live_msg_iter,
612 const bt_message *msg, int64_t last_msg_ts_ns,
613 int64_t *ts_ns)
614 {
615 const bt_clock_class *clock_class = NULL;
616 const bt_clock_snapshot *clock_snapshot = NULL;
617 int ret = 0;
618 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
619
620 BT_ASSERT(msg);
621 BT_ASSERT(ts_ns);
622
623 BT_LOGV("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
624 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
625 last_msg_ts_ns);
626
627 switch (bt_message_get_type(msg)) {
628 case BT_MESSAGE_TYPE_EVENT:
629 clock_class =
630 bt_message_event_borrow_stream_class_default_clock_class_const(
631 msg);
632 BT_ASSERT(clock_class);
633
634 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
635 msg);
636 break;
637 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
638 clock_class =
639 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
640 msg);
641 BT_ASSERT(clock_class);
642
643 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
644 msg);
645 break;
646 case BT_MESSAGE_TYPE_PACKET_END:
647 clock_class =
648 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
649 msg);
650 BT_ASSERT(clock_class);
651
652 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
653 msg);
654 break;
655 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
656 clock_class =
657 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
658 msg);
659 BT_ASSERT(clock_class);
660
661 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
662 msg);
663 break;
664 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
665 clock_class =
666 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
667 msg);
668 BT_ASSERT(clock_class);
669
670 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
671 msg);
672 break;
673 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
674 clock_class =
675 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
676 msg);
677 BT_ASSERT(clock_class);
678
679 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
680 msg, &clock_snapshot);
681 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
682 goto no_clock_snapshot;
683 }
684
685 break;
686 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
687 clock_class =
688 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
689 msg);
690 BT_ASSERT(clock_class);
691
692 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
693 msg, &clock_snapshot);
694 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
695 goto no_clock_snapshot;
696 }
697
698 break;
699 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
700 clock_snapshot =
701 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
702 msg);
703 break;
704 default:
705 /* All the other messages have a higher priority */
706 BT_LOGV_STR("Message has no timestamp: using the last message timestamp.");
707 *ts_ns = last_msg_ts_ns;
708 goto end;
709 }
710
711 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
712 BT_ASSERT(clock_class);
713
714 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
715 if (ret) {
716 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
717 "clock-snapshot-addr=%p", clock_snapshot);
718 goto error;
719 }
720
721 goto end;
722
723 no_clock_snapshot:
724 BT_LOGV_STR("Message's default clock snapshot is missing: "
725 "using the last message timestamp.");
726 *ts_ns = last_msg_ts_ns;
727 goto end;
728
729 error:
730 ret = -1;
731
732 end:
733 if (ret == 0) {
734 BT_LOGV("Found message's timestamp: "
735 "iter-data-addr=%p, msg-addr=%p, "
736 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
737 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
738 }
739
740 return ret;
741 }
742
743 static
744 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
745 struct lttng_live_msg_iter *lttng_live_msg_iter,
746 struct lttng_live_stream_iterator *lttng_live_stream,
747 bt_message **message)
748 {
749 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
750 enum bt_msg_iter_status status;
751 uint64_t session_idx, trace_idx;
752
753 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
754 session_idx++) {
755 struct lttng_live_session *session =
756 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
757
758 if (session->new_streams_needed) {
759 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
760 goto end;
761 }
762 for (trace_idx = 0; trace_idx < session->traces->len;
763 trace_idx++) {
764 struct lttng_live_trace *trace =
765 g_ptr_array_index(session->traces, trace_idx);
766 if (trace->new_metadata_needed) {
767 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
768 goto end;
769 }
770 }
771 }
772
773 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
774 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
775 goto end;
776 }
777
778 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
779 lttng_live_msg_iter->self_msg_iter, message);
780 switch (status) {
781 case BT_MSG_ITER_STATUS_EOF:
782 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
783 break;
784 case BT_MSG_ITER_STATUS_OK:
785 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
786 break;
787 case BT_MSG_ITER_STATUS_AGAIN:
788 /*
789 * Continue immediately (end of packet). The next
790 * get_index may return AGAIN to delay the following
791 * attempt.
792 */
793 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
794 break;
795 case BT_MSG_ITER_STATUS_INVAL:
796 /* No argument provided by the user, so don't return INVAL. */
797 case BT_MSG_ITER_STATUS_ERROR:
798 default:
799 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
800 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
801 lttng_live_stream->msg_iter);
802 break;
803 }
804
805 end:
806 return ret;
807 }
808
809 /*
810 * helper function:
811 * handle_no_data_streams()
812 * retry:
813 * - for each ACTIVE_NO_DATA stream:
814 * - query relayd for stream data, or quiescence info.
815 * - if need metadata, get metadata, goto retry.
816 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
817 * - if quiescent, move to QUIESCENT streams
818 * - if fetched data, move to ACTIVE_DATA streams
819 * (at this point each stream either has data, or is quiescent)
820 *
821 *
822 * iterator_next:
823 * handle_new_streams_and_metadata()
824 * - query relayd for known streams, add them as ACTIVE_NO_DATA
825 * - query relayd for metadata
826 *
827 * call handle_active_no_data_streams()
828 *
829 * handle_quiescent_streams()
830 * - if at least one stream is ACTIVE_DATA:
831 * - peek stream event with lowest timestamp -> next_ts
832 * - for each quiescent stream
833 * - if next_ts >= quiescent end
834 * - set state to ACTIVE_NO_DATA
835 * - else
836 * - for each quiescent stream
837 * - set state to ACTIVE_NO_DATA
838 *
839 * call handle_active_no_data_streams()
840 *
841 * handle_active_data_streams()
842 * - if at least one stream is ACTIVE_DATA:
843 * - get stream event with lowest timestamp from heap
844 * - make that stream event the current message.
845 * - move this stream heap position to its next event
846 * - if we need to fetch data from relayd, move
847 * stream to ACTIVE_NO_DATA.
848 * - return OK
849 * - return AGAIN
850 *
851 * end criterion: ctrl-c on client. If relayd exits or the session
852 * closes on the relay daemon side, we keep on waiting for streams.
853 * Eventually handle --end timestamp (also an end criterion).
854 *
855 * When disconnected from relayd: try to re-connect endlessly.
856 */
857 static
858 enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
859 struct lttng_live_msg_iter *lttng_live_msg_iter,
860 struct lttng_live_stream_iterator *stream_iter,
861 bt_message **curr_msg)
862 {
863 enum lttng_live_iterator_status live_status;
864
865 retry:
866 print_stream_state(stream_iter);
867 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
868 lttng_live_msg_iter);
869 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
870 goto end;
871 }
872 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
873 lttng_live_msg_iter, stream_iter);
874 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
875 goto end;
876 }
877 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
878 lttng_live_msg_iter, stream_iter, curr_msg);
879 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
880 BT_ASSERT(*curr_msg == NULL);
881 goto end;
882 }
883 if (*curr_msg) {
884 goto end;
885 }
886 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
887 lttng_live_msg_iter, stream_iter, curr_msg);
888 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
889 BT_ASSERT(*curr_msg == NULL);
890 }
891
892 end:
893 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
894 goto retry;
895 }
896
897 return live_status;
898 }
899
900 static
901 enum lttng_live_iterator_status next_stream_iterator_for_trace(
902 struct lttng_live_msg_iter *lttng_live_msg_iter,
903 struct lttng_live_trace *live_trace,
904 struct lttng_live_stream_iterator **candidate_stream_iter)
905 {
906 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
907 enum lttng_live_iterator_status stream_iter_status;;
908 int64_t curr_candidate_msg_ts = INT64_MAX;
909 uint64_t stream_iter_idx;
910
911 BT_ASSERT(live_trace);
912 BT_ASSERT(live_trace->stream_iterators);
913 /*
914 * Update the current message of every stream iterators of this trace.
915 * The current msg of every stream must have a timestamp equal or
916 * larger than the last message returned by this iterator. We must
917 * ensure monotonicity.
918 */
919 stream_iter_idx = 0;
920 while (stream_iter_idx < live_trace->stream_iterators->len) {
921 bool stream_iter_is_ended = false;
922 struct lttng_live_stream_iterator *stream_iter =
923 g_ptr_array_index(live_trace->stream_iterators,
924 stream_iter_idx);
925
926 /*
927 * Find if there is are now current message for this stream
928 * iterator get it.
929 */
930 while (!stream_iter->current_msg) {
931 bt_message *msg = NULL;
932 int64_t curr_msg_ts_ns = INT64_MAX;
933 stream_iter_status = lttng_live_iterator_next_on_stream(
934 lttng_live_msg_iter, stream_iter, &msg);
935
936 BT_LOGD("live stream iterator returned status :%s",
937 print_live_iterator_status(stream_iter_status));
938 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
939 stream_iter_is_ended = true;
940 break;
941 }
942
943 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
944 goto end;
945 }
946
947 BT_ASSERT(msg);
948
949 /*
950 * Get the timestamp in nanoseconds from origin of this
951 * messsage.
952 */
953 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
954 msg, lttng_live_msg_iter->last_msg_ts_ns,
955 &curr_msg_ts_ns);
956
957 /*
958 * Check if the message of the current live stream
959 * iterator occured at the exact same time or after the
960 * last message returned by this component's message
961 * iterator. If not, we return an error.
962 */
963 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
964 stream_iter->current_msg = msg;
965 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
966 } else {
967 /*
968 * We received a message in the past. To ensure
969 * monotonicity, we can't send it forward.
970 */
971 BT_LOGE("Message's timestamp is less than "
972 "lttng-live's message iterator's last "
973 "returned timestamp: "
974 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
975 "last-msg-ts=%" PRId64,
976 lttng_live_msg_iter, curr_msg_ts_ns,
977 lttng_live_msg_iter->last_msg_ts_ns);
978 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
979 goto end;
980 }
981 }
982
983 if (!stream_iter_is_ended &&
984 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
985 /*
986 * Update the current best candidate message for the
987 * stream iterator of thise live trace to be forwarded
988 * downstream.
989 */
990 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
991 curr_candidate_stream_iter = stream_iter;
992 }
993
994 if (stream_iter_is_ended) {
995 /*
996 * The live stream iterator is ENDed. We remove that
997 * iterator from the list and we restart the iteration
998 * at the beginning of the live stream iterator array
999 * to because the removal will shuffle the array.
1000 */
1001 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1002 stream_iter_idx);
1003 stream_iter_idx = 0;
1004 } else {
1005 stream_iter_idx++;
1006 }
1007 }
1008
1009 if (curr_candidate_stream_iter) {
1010 *candidate_stream_iter = curr_candidate_stream_iter;
1011 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1012 } else {
1013 /*
1014 * The only case where we don't have a candidate for this trace
1015 * is if we reached the end of all the iterators.
1016 */
1017 BT_ASSERT(live_trace->stream_iterators->len == 0);
1018 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1019 }
1020
1021 end:
1022 return stream_iter_status;
1023 }
1024
1025 static
1026 enum lttng_live_iterator_status next_stream_iterator_for_session(
1027 struct lttng_live_msg_iter *lttng_live_msg_iter,
1028 struct lttng_live_session *session,
1029 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1030 {
1031 enum lttng_live_iterator_status stream_iter_status;
1032 uint64_t trace_idx = 0;
1033 int64_t curr_candidate_msg_ts = INT64_MAX;
1034 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1035
1036 /*
1037 * Make sure we are attached to the session and look for new streams
1038 * and metadata.
1039 */
1040 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1041 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1042 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1043 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1044 goto end;
1045 }
1046
1047 BT_ASSERT(session->traces);
1048
1049 /*
1050 * Use while loops here rather then for loops so we can restart the
1051 * iteration if an element is removed from the array during the
1052 * looping.
1053 */
1054 while (trace_idx < session->traces->len) {
1055 bool trace_is_ended = false;
1056 struct lttng_live_stream_iterator *stream_iter;
1057 struct lttng_live_trace *trace =
1058 g_ptr_array_index(session->traces, trace_idx);
1059
1060 stream_iter_status = next_stream_iterator_for_trace(
1061 lttng_live_msg_iter, trace, &stream_iter);
1062 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1063 /*
1064 * All the live stream iterators for this trace are
1065 * ENDed. Remove the trace from this session.
1066 */
1067 trace_is_ended = true;
1068 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1069 goto end;
1070 }
1071
1072 if (!trace_is_ended) {
1073 BT_ASSERT(stream_iter);
1074
1075 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1076 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1077 curr_candidate_stream_iter = stream_iter;
1078 }
1079 trace_idx++;
1080 } else {
1081 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1082 trace_idx = 0;
1083 }
1084 }
1085 if (curr_candidate_stream_iter) {
1086 *candidate_session_stream_iter = curr_candidate_stream_iter;
1087 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1088 } else {
1089 /*
1090 * The only cases where we don't have a candidate for this
1091 * trace is:
1092 * 1. if we reached the end of all the iterators of all the
1093 * traces of this session,
1094 * 2. if we never had live stream iterator in the first place.
1095 *
1096 * In either cases, we return END.
1097 */
1098 BT_ASSERT(session->traces->len == 0);
1099 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1100 }
1101 end:
1102 return stream_iter_status;
1103 }
1104
1105 static inline
1106 void put_messages(bt_message_array_const msgs, uint64_t count)
1107 {
1108 uint64_t i;
1109
1110 for (i = 0; i < count; i++) {
1111 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1112 }
1113 }
1114
1115 BT_HIDDEN
1116 bt_self_message_iterator_status lttng_live_msg_iter_next(
1117 bt_self_message_iterator *self_msg_it,
1118 bt_message_array_const msgs, uint64_t capacity,
1119 uint64_t *count)
1120 {
1121 bt_self_message_iterator_status status;
1122 struct lttng_live_msg_iter *lttng_live_msg_iter =
1123 bt_self_message_iterator_get_data(self_msg_it);
1124 struct lttng_live_component *lttng_live =
1125 lttng_live_msg_iter->lttng_live_comp;
1126 enum lttng_live_iterator_status stream_iter_status;
1127 uint64_t session_idx;
1128
1129 *count = 0;
1130
1131 BT_ASSERT(lttng_live_msg_iter);
1132
1133 /*
1134 * Clear all the invalid message reference that might be left over in
1135 * the output array.
1136 */
1137 memset(msgs, 0, capacity * sizeof(*msgs));
1138
1139 /*
1140 * If no session are exposed on the relay found at the url provided by
1141 * the user, session count will be 0. In this case, we return status
1142 * end to return gracefully.
1143 */
1144 if (lttng_live_msg_iter->sessions->len == 0) {
1145 if (lttng_live->params.sess_not_found_act !=
1146 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1147 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1148 goto no_session;
1149 } else {
1150 /*
1151 * The are no more active session for this session
1152 * name. Retry to create a viewer session for the
1153 * requested session name.
1154 */
1155 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1156 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1157 goto no_session;
1158 }
1159 }
1160 }
1161
1162 if (lttng_live_msg_iter->active_stream_iter == 0) {
1163 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1164 }
1165
1166 /*
1167 * Here the muxing of message is done.
1168 *
1169 * We need to iterate over all the streams of all the traces of all the
1170 * viewer sessions in order to get the message with the smallest
1171 * timestamp. In this case, a session is a viewer session and there is
1172 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1173 * kernel). Each viewer session can have multiple traces, for example,
1174 * 64bit UST viewer sessions could have multiple per-pid traces.
1175 *
1176 * We iterate over the streams of each traces to update and see what is
1177 * their next message's timestamp. From those timestamps, we select the
1178 * message with the smallest timestamp as the best candidate message
1179 * for that trace and do the same thing across all the sessions.
1180 *
1181 * We then compare the timestamp of best candidate message of all the
1182 * sessions to pick the message with the smallest timestamp and we
1183 * return it.
1184 */
1185 while (*count < capacity) {
1186 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1187 *candidate_stream_iter = NULL;
1188 int64_t next_msg_ts_ns = INT64_MAX;
1189
1190 BT_ASSERT(lttng_live_msg_iter->sessions);
1191 session_idx = 0;
1192 /*
1193 * Use a while loop instead of a for loop so we can restart the
1194 * iteration if we remove an element. We can safely call
1195 * next_stream_iterator_for_session() multiple times on the
1196 * same session as we only fetch a new message if there is no
1197 * current next message for each live stream iterator.
1198 * If all live stream iterator of that session already have a
1199 * current next message, the function will simply exit return
1200 * the same candidate live stream iterator every time.
1201 */
1202 while (session_idx < lttng_live_msg_iter->sessions->len) {
1203 struct lttng_live_session *session =
1204 g_ptr_array_index(lttng_live_msg_iter->sessions,
1205 session_idx);
1206
1207 /* Find the best candidate message to send downstream. */
1208 stream_iter_status = next_stream_iterator_for_session(
1209 lttng_live_msg_iter, session,
1210 &candidate_stream_iter);
1211
1212 /* If we receive an END status, it means that either:
1213 * - Those traces never had active streams (UST with no
1214 * data produced yet),
1215 * - All live stream iterators have ENDed.*/
1216 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1217 if (session->closed && session->traces->len == 0) {
1218 /*
1219 * Remove the session from the list and restart the
1220 * iteration at the beginning of the array since the
1221 * removal shuffle the elements of the array.
1222 */
1223 g_ptr_array_remove_index_fast(
1224 lttng_live_msg_iter->sessions,
1225 session_idx);
1226 session_idx = 0;
1227 } else {
1228 session_idx++;
1229 }
1230 continue;
1231 }
1232
1233 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1234 goto end;
1235 }
1236
1237 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1238 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1239 next_stream_iter = candidate_stream_iter;
1240 }
1241
1242 session_idx++;
1243 }
1244
1245 if (!next_stream_iter) {
1246 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1247 goto end;
1248 }
1249
1250 BT_ASSERT(next_stream_iter->current_msg);
1251 /* Ensure monotonicity. */
1252 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1253 next_stream_iter->current_msg_ts_ns);
1254
1255 /*
1256 * Insert the next message to the message batch. This will set
1257 * stream iterator current messsage to NULL so that next time
1258 * we fetch the next message of that stream iterator
1259 */
1260 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1261 (*count)++;
1262
1263 /* Update the last timestamp in nanoseconds sent downstream. */
1264 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1265 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1266
1267 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1268 }
1269 end:
1270 switch (stream_iter_status) {
1271 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1272 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1273 if (*count > 0) {
1274 /*
1275 * We received a again status but we have some messages
1276 * to send downstream. We send them and return OK for
1277 * now. On the next call we return again if there are
1278 * still no new message to send.
1279 */
1280 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1281 } else {
1282 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1283 }
1284 break;
1285 case LTTNG_LIVE_ITERATOR_STATUS_END:
1286 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1287 break;
1288 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1289 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1290 break;
1291 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1292 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1293 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1294 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1295 /* Put all existing messages on error. */
1296 put_messages(msgs, *count);
1297 break;
1298 default:
1299 abort();
1300 }
1301
1302 no_session:
1303 return status;
1304 }
1305
1306 BT_HIDDEN
1307 bt_self_message_iterator_status lttng_live_msg_iter_init(
1308 bt_self_message_iterator *self_msg_it,
1309 bt_self_component_source *self_comp_src,
1310 bt_self_component_port_output *self_port)
1311 {
1312 bt_self_message_iterator_status ret =
1313 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1314 bt_self_component *self_comp =
1315 bt_self_component_source_as_self_component(self_comp_src);
1316 struct lttng_live_component *lttng_live;
1317 struct lttng_live_msg_iter *lttng_live_msg_iter;
1318
1319 BT_ASSERT(self_msg_it);
1320
1321 lttng_live = bt_self_component_get_data(self_comp);
1322
1323 /* There can be only one downstream iterator at the same time. */
1324 BT_ASSERT(!lttng_live->has_msg_iter);
1325 lttng_live->has_msg_iter = true;
1326
1327 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1328 if (!lttng_live_msg_iter) {
1329 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1330 goto end;
1331 }
1332
1333 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1334 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1335
1336 lttng_live_msg_iter->active_stream_iter = 0;
1337 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1338 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1339 (GDestroyNotify) lttng_live_destroy_session);
1340 BT_ASSERT(lttng_live_msg_iter->sessions);
1341
1342 lttng_live_msg_iter->viewer_connection =
1343 live_viewer_connection_create(lttng_live->params.url->str, false,
1344 lttng_live_msg_iter);
1345 if (!lttng_live_msg_iter->viewer_connection) {
1346 goto error;
1347 }
1348
1349 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1350 goto error;
1351 }
1352 if (lttng_live_msg_iter->sessions->len == 0) {
1353 switch (lttng_live->params.sess_not_found_act) {
1354 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1355 BT_LOGI("Unable to connect to the requested live viewer "
1356 "session. Keep trying to connect because of "
1357 "%s=\"%s\" component parameter: url=\"%s\"",
1358 SESS_NOT_FOUND_ACTION_PARAM,
1359 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1360 lttng_live->params.url->str);
1361 break;
1362 case SESSION_NOT_FOUND_ACTION_FAIL:
1363 BT_LOGE("Unable to connect to the requested live viewer "
1364 "session. Fail the message iterator"
1365 "initialization because of %s=\"%s\" "
1366 "component parameter: url =\"%s\"",
1367 SESS_NOT_FOUND_ACTION_PARAM,
1368 SESS_NOT_FOUND_ACTION_FAIL_STR,
1369 lttng_live->params.url->str);
1370 goto error;
1371 case SESSION_NOT_FOUND_ACTION_END:
1372 BT_LOGI("Unable to connect to the requested live viewer "
1373 "session. End gracefully at the first _next() "
1374 "call because of %s=\"%s\" component parameter: "
1375 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1376 SESS_NOT_FOUND_ACTION_END_STR,
1377 lttng_live->params.url->str);
1378 break;
1379 }
1380 }
1381
1382 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1383
1384 goto end;
1385 error:
1386 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1387 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1388 end:
1389 return ret;
1390 }
1391
1392 static
1393 bt_query_status lttng_live_query_list_sessions(const bt_value *params,
1394 const bt_value **result)
1395 {
1396 bt_query_status status = BT_QUERY_STATUS_OK;
1397 const bt_value *url_value = NULL;
1398 const char *url;
1399 struct live_viewer_connection *viewer_connection = NULL;
1400
1401 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1402 if (!url_value) {
1403 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1404 status = BT_QUERY_STATUS_INVALID_PARAMS;
1405 goto error;
1406 }
1407
1408 if (!bt_value_is_string(url_value)) {
1409 BT_LOGW("`%s` parameter is required to be a string value",
1410 URL_PARAM);
1411 status = BT_QUERY_STATUS_INVALID_PARAMS;
1412 goto error;
1413 }
1414
1415 url = bt_value_string_get(url_value);
1416
1417 viewer_connection = live_viewer_connection_create(url, true, NULL);
1418 if (!viewer_connection) {
1419 goto error;
1420 }
1421
1422 status = live_viewer_connection_list_sessions(viewer_connection,
1423 result);
1424 if (status != BT_QUERY_STATUS_OK) {
1425 goto error;
1426 }
1427
1428 goto end;
1429
1430 error:
1431 BT_VALUE_PUT_REF_AND_RESET(*result);
1432
1433 if (status >= 0) {
1434 status = BT_QUERY_STATUS_ERROR;
1435 }
1436
1437 end:
1438 if (viewer_connection) {
1439 live_viewer_connection_destroy(viewer_connection);
1440 }
1441 return status;
1442 }
1443
1444 BT_HIDDEN
1445 bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
1446 const bt_query_executor *query_exec,
1447 const char *object, const bt_value *params,
1448 const bt_value **result)
1449 {
1450 bt_query_status status = BT_QUERY_STATUS_OK;
1451
1452 if (strcmp(object, "sessions") == 0) {
1453 status = lttng_live_query_list_sessions(params, result);
1454 } else {
1455 BT_LOGW("Unknown query object `%s`", object);
1456 status = BT_QUERY_STATUS_INVALID_OBJECT;
1457 goto end;
1458 }
1459
1460 end:
1461 return status;
1462 }
1463
1464 static
1465 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1466 {
1467 if (!lttng_live) {
1468 return;
1469 }
1470 if (lttng_live->params.url) {
1471 g_string_free(lttng_live->params.url, TRUE);
1472 }
1473 g_free(lttng_live);
1474 }
1475
1476 BT_HIDDEN
1477 void lttng_live_component_finalize(bt_self_component_source *component)
1478 {
1479 void *data = bt_self_component_get_data(
1480 bt_self_component_source_as_self_component(component));
1481
1482 if (!data) {
1483 return;
1484 }
1485 lttng_live_component_destroy_data(data);
1486 }
1487
1488 static
1489 enum session_not_found_action parse_session_not_found_action_param(
1490 const bt_value *no_session_param)
1491 {
1492 enum session_not_found_action action;
1493 const char *no_session_act_str;
1494 no_session_act_str = bt_value_string_get(no_session_param);
1495 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1496 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1497 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1498 action = SESSION_NOT_FOUND_ACTION_FAIL;
1499 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1500 action = SESSION_NOT_FOUND_ACTION_END;
1501 } else {
1502 action = -1;
1503 }
1504
1505 return action;
1506 }
1507
1508 struct lttng_live_component *lttng_live_component_create(const bt_value *params)
1509 {
1510 struct lttng_live_component *lttng_live;
1511 const bt_value *value = NULL;
1512 const char *url;
1513
1514 lttng_live = g_new0(struct lttng_live_component, 1);
1515 if (!lttng_live) {
1516 goto end;
1517 }
1518 lttng_live->max_query_size = MAX_QUERY_SIZE;
1519 lttng_live->has_msg_iter = false;
1520
1521 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1522 if (!value || !bt_value_is_string(value)) {
1523 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1524 URL_PARAM);
1525 goto error;
1526 }
1527 url = bt_value_string_get(value);
1528 lttng_live->params.url = g_string_new(url);
1529 if (!lttng_live->params.url) {
1530 goto error;
1531 }
1532
1533 value = bt_value_map_borrow_entry_value_const(params,
1534 SESS_NOT_FOUND_ACTION_PARAM);
1535
1536 if (value && bt_value_is_string(value)) {
1537 lttng_live->params.sess_not_found_act =
1538 parse_session_not_found_action_param(value);
1539 if (lttng_live->params.sess_not_found_act == -1) {
1540 BT_LOGE("Unexpected value for `%s` parameter: "
1541 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1542 bt_value_string_get(value));
1543 goto error;
1544 }
1545 } else {
1546 BT_LOGW("Optional `%s` parameter is missing or "
1547 "not a string value. Defaulting to %s=\"%s\".",
1548 SESS_NOT_FOUND_ACTION_PARAM,
1549 SESS_NOT_FOUND_ACTION_PARAM,
1550 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1551 lttng_live->params.sess_not_found_act =
1552 SESSION_NOT_FOUND_ACTION_CONTINUE;
1553 }
1554
1555 goto end;
1556
1557 error:
1558 lttng_live_component_destroy_data(lttng_live);
1559 lttng_live = NULL;
1560 end:
1561 return lttng_live;
1562 }
1563
1564 BT_HIDDEN
1565 bt_self_component_status lttng_live_component_init(
1566 bt_self_component_source *self_comp,
1567 const bt_value *params, UNUSED_VAR void *init_method_data)
1568 {
1569 struct lttng_live_component *lttng_live;
1570 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
1571
1572 lttng_live = lttng_live_component_create(params);
1573 if (!lttng_live) {
1574 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
1575 goto error;
1576 }
1577 lttng_live->self_comp = self_comp;
1578
1579 if (lttng_live_graph_is_canceled(lttng_live)) {
1580 ret = BT_SELF_COMPONENT_STATUS_END;
1581 goto error;
1582 }
1583
1584 ret = bt_self_component_source_add_output_port(
1585 lttng_live->self_comp, "out",
1586 NULL, NULL);
1587 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
1588 goto error;
1589 }
1590
1591 bt_self_component_set_data(
1592 bt_self_component_source_as_self_component(self_comp),
1593 lttng_live);
1594 goto end;
1595
1596 error:
1597 lttng_live_component_destroy_data(lttng_live);
1598 lttng_live = NULL;
1599 end:
1600 return ret;
1601 }
This page took 0.102305 seconds and 4 git commands to generate.