589472dcaa5db2a3faa57033118551dd36909e83
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
7 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this software and associated documentation files (the "Software"), to deal
14 * in the Software without restriction, including without limitation the rights
15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 * copies of the Software, and to permit persons to whom the Software is
17 * furnished to do so, subject to the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * SOFTWARE.
29 */
30
31 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
32 #include "logging.h"
33
34 #include <glib.h>
35 #include <inttypes.h>
36 #include <unistd.h>
37
38 #include "common/assert.h"
39 #include <babeltrace2/babeltrace.h>
40 #include "compat/compiler.h"
41 #include <babeltrace2/types.h>
42
43 #include "data-stream.h"
44 #include "metadata.h"
45 #include "lttng-live.h"
46
47 #define MAX_QUERY_SIZE (256*1024)
48 #define URL_PARAM "url"
49 #define SESS_NOT_FOUND_ACTION_PARAM "session-not-found-action"
50 #define SESS_NOT_FOUND_ACTION_CONTINUE_STR "continue"
51 #define SESS_NOT_FOUND_ACTION_FAIL_STR "fail"
52 #define SESS_NOT_FOUND_ACTION_END_STR "end"
53
54 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
55
56 static
57 const char *print_live_iterator_status(enum lttng_live_iterator_status status)
58 {
59 switch (status) {
60 case LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
61 return "LTTNG_LIVE_ITERATOR_STATUS_CONTINUE";
62 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
63 return "LTTNG_LIVE_ITERATOR_STATUS_AGAIN";
64 case LTTNG_LIVE_ITERATOR_STATUS_END:
65 return "LTTNG_LIVE_ITERATOR_STATUS_END";
66 case LTTNG_LIVE_ITERATOR_STATUS_OK:
67 return "LTTNG_LIVE_ITERATOR_STATUS_OK";
68 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
69 return "LTTNG_LIVE_ITERATOR_STATUS_INVAL";
70 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
71 return "LTTNG_LIVE_ITERATOR_STATUS_ERROR";
72 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
73 return "LTTNG_LIVE_ITERATOR_STATUS_NOMEM";
74 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
75 return "LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED";
76 default:
77 abort();
78 }
79 }
80
81 static
82 const char *print_state(struct lttng_live_stream_iterator *s)
83 {
84 switch (s->state) {
85 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
86 return "ACTIVE_NO_DATA";
87 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
88 return "QUIESCENT_NO_DATA";
89 case LTTNG_LIVE_STREAM_QUIESCENT:
90 return "QUIESCENT";
91 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
92 return "ACTIVE_DATA";
93 case LTTNG_LIVE_STREAM_EOF:
94 return "EOF";
95 default:
96 return "ERROR";
97 }
98 }
99
100 #define print_stream_state(live_stream_iter) \
101 do { \
102 BT_LOGD("stream state %s last_inact_ts %" PRId64 \
103 ", curr_inact_ts %" PRId64, \
104 print_state(live_stream_iter), \
105 live_stream_iter->last_inactivity_ts, \
106 live_stream_iter->current_inactivity_ts); \
107 } while (0);
108
109 BT_HIDDEN
110 bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
111 {
112 const bt_component *component;
113 bool ret;
114
115 if (!lttng_live) {
116 ret = false;
117 goto end;
118 }
119
120 component = bt_component_source_as_component_const(
121 bt_self_component_source_as_component_source(
122 lttng_live->self_comp));
123
124 ret = bt_component_graph_is_canceled(component);
125
126 end:
127 return ret;
128 }
129
130 static
131 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
132 uint64_t trace_id)
133 {
134 uint64_t trace_idx;
135 struct lttng_live_trace *ret_trace = NULL;
136
137 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
138 struct lttng_live_trace *trace =
139 g_ptr_array_index(session->traces, trace_idx);
140 if (trace->id == trace_id) {
141 ret_trace = trace;
142 goto end;
143 }
144 }
145
146 end:
147 return ret_trace;
148 }
149
150 static
151 void lttng_live_destroy_trace(struct lttng_live_trace *trace)
152 {
153 BT_LOGD("Destroy lttng_live_trace");
154
155 BT_ASSERT(trace->stream_iterators);
156 g_ptr_array_free(trace->stream_iterators, TRUE);
157
158 BT_TRACE_PUT_REF_AND_RESET(trace->trace);
159 BT_TRACE_CLASS_PUT_REF_AND_RESET(trace->trace_class);
160
161 lttng_live_metadata_fini(trace);
162 g_free(trace);
163 }
164
165 static
166 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
167 uint64_t trace_id)
168 {
169 struct lttng_live_trace *trace = NULL;
170
171 trace = g_new0(struct lttng_live_trace, 1);
172 if (!trace) {
173 goto error;
174 }
175 trace->session = session;
176 trace->id = trace_id;
177 trace->trace_class = NULL;
178 trace->trace = NULL;
179 trace->stream_iterators = g_ptr_array_new_with_free_func(
180 (GDestroyNotify) lttng_live_stream_iterator_destroy);
181 BT_ASSERT(trace->stream_iterators);
182 trace->new_metadata_needed = true;
183 g_ptr_array_add(session->traces, trace);
184
185 BT_LOGI("Create trace");
186 goto end;
187 error:
188 g_free(trace);
189 trace = NULL;
190 end:
191 return trace;
192 }
193
194 BT_HIDDEN
195 struct lttng_live_trace *lttng_live_borrow_trace(
196 struct lttng_live_session *session, uint64_t trace_id)
197 {
198 struct lttng_live_trace *trace;
199
200 trace = lttng_live_find_trace(session, trace_id);
201 if (trace) {
202 goto end;
203 }
204
205 /* The session is the owner of the newly created trace. */
206 trace = lttng_live_create_trace(session, trace_id);
207
208 end:
209 return trace;
210 }
211
212 BT_HIDDEN
213 int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
214 uint64_t session_id, const char *hostname,
215 const char *session_name)
216 {
217 int ret = 0;
218 struct lttng_live_session *session;
219
220 session = g_new0(struct lttng_live_session, 1);
221 if (!session) {
222 goto error;
223 }
224
225 session->id = session_id;
226 session->traces = g_ptr_array_new_with_free_func(
227 (GDestroyNotify) lttng_live_destroy_trace);
228 BT_ASSERT(session->traces);
229 session->lttng_live_msg_iter = lttng_live_msg_iter;
230 session->new_streams_needed = true;
231 session->hostname = g_string_new(hostname);
232 BT_ASSERT(session->hostname);
233
234 session->session_name = g_string_new(session_name);
235 BT_ASSERT(session->session_name);
236
237 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
238 session->id, hostname, session_name);
239 g_ptr_array_add(lttng_live_msg_iter->sessions, session);
240 goto end;
241 error:
242 BT_LOGE("Error adding session");
243 g_free(session);
244 ret = -1;
245 end:
246 return ret;
247 }
248
249 static
250 void lttng_live_destroy_session(struct lttng_live_session *session)
251 {
252 struct lttng_live_component *live_comp;
253
254 if (!session) {
255 goto end;
256 }
257
258 BT_LOGD("Destroy lttng live session");
259 if (session->id != -1ULL) {
260 if (lttng_live_detach_session(session)) {
261 live_comp = session->lttng_live_msg_iter->lttng_live_comp;
262 if (session->lttng_live_msg_iter &&
263 !lttng_live_graph_is_canceled(live_comp)) {
264 /* Old relayd cannot detach sessions. */
265 BT_LOGD("Unable to detach lttng live session %" PRIu64,
266 session->id);
267 }
268 }
269 session->id = -1ULL;
270 }
271
272 if (session->traces) {
273 g_ptr_array_free(session->traces, TRUE);
274 }
275
276 if (session->hostname) {
277 g_string_free(session->hostname, TRUE);
278 }
279 if (session->session_name) {
280 g_string_free(session->session_name, TRUE);
281 }
282 g_free(session);
283
284 end:
285 return;
286 }
287
288 static
289 void lttng_live_msg_iter_destroy(struct lttng_live_msg_iter *lttng_live_msg_iter)
290 {
291 if (!lttng_live_msg_iter) {
292 goto end;
293 }
294
295 if (lttng_live_msg_iter->sessions) {
296 g_ptr_array_free(lttng_live_msg_iter->sessions, TRUE);
297 }
298
299 BT_OBJECT_PUT_REF_AND_RESET(lttng_live_msg_iter->viewer_connection);
300 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp);
301 BT_ASSERT(lttng_live_msg_iter->lttng_live_comp->has_msg_iter);
302
303 /* All stream iterators must be destroyed at this point. */
304 BT_ASSERT(lttng_live_msg_iter->active_stream_iter == 0);
305 lttng_live_msg_iter->lttng_live_comp->has_msg_iter = false;
306
307 g_free(lttng_live_msg_iter);
308
309 end:
310 return;
311 }
312
313 BT_HIDDEN
314 void lttng_live_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
315 {
316 struct lttng_live_msg_iter *lttng_live_msg_iter;
317
318 BT_ASSERT(self_msg_iter);
319
320 lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_iter);
321 BT_ASSERT(lttng_live_msg_iter);
322 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
323 }
324
325 static
326 enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
327 struct lttng_live_stream_iterator *lttng_live_stream)
328 {
329 switch (lttng_live_stream->state) {
330 case LTTNG_LIVE_STREAM_QUIESCENT:
331 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
332 break;
333 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
334 /* Invalid state. */
335 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
336 abort();
337 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
338 /* Invalid state. */
339 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
340 abort();
341 case LTTNG_LIVE_STREAM_EOF:
342 break;
343 }
344 return LTTNG_LIVE_ITERATOR_STATUS_OK;
345 }
346
347 /*
348 * For active no data stream, fetch next data. It can be either:
349 * - quiescent: need to put it in the prio heap at quiescent end
350 * timestamp,
351 * - have data: need to wire up first event into the prio heap,
352 * - have no data on this stream at this point: need to retry (AGAIN) or
353 * return EOF.
354 */
355 static
356 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
357 struct lttng_live_msg_iter *lttng_live_msg_iter,
358 struct lttng_live_stream_iterator *lttng_live_stream)
359 {
360 enum lttng_live_iterator_status ret =
361 LTTNG_LIVE_ITERATOR_STATUS_OK;
362 struct packet_index index;
363 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
364
365 if (lttng_live_stream->trace->new_metadata_needed) {
366 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
367 goto end;
368 }
369 if (lttng_live_stream->trace->session->new_streams_needed) {
370 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
371 goto end;
372 }
373 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA &&
374 lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
375 goto end;
376 }
377 ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream,
378 &index);
379 if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) {
380 goto end;
381 }
382 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
383 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
384 uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts,
385 curr_inact_ts = lttng_live_stream->current_inactivity_ts;
386
387 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA &&
388 last_inact_ts == curr_inact_ts) {
389 ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
390 print_stream_state(lttng_live_stream);
391 } else {
392 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
393 }
394 goto end;
395 }
396 lttng_live_stream->base_offset = index.offset;
397 lttng_live_stream->offset = index.offset;
398 lttng_live_stream->len = index.packet_size / CHAR_BIT;
399 end:
400 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) {
401 ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream);
402 }
403 return ret;
404 }
405
406 /*
407 * Creation of the message requires the ctf trace class to be created
408 * beforehand, but the live protocol gives us all streams (including metadata)
409 * at once. So we split it in three steps: getting streams, getting metadata
410 * (which creates the ctf trace class), and then creating the per-stream
411 * messages.
412 */
413 static
414 enum lttng_live_iterator_status lttng_live_get_session(
415 struct lttng_live_msg_iter *lttng_live_msg_iter,
416 struct lttng_live_session *session)
417 {
418 enum lttng_live_iterator_status status;
419 uint64_t trace_idx;
420 int ret = 0;
421
422 if (!session->attached) {
423 ret = lttng_live_attach_session(session);
424 if (ret) {
425 if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
426 lttng_live_msg_iter->lttng_live_comp)) {
427 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
428 } else {
429 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
430 }
431 goto end;
432 }
433 }
434
435 status = lttng_live_get_new_streams(session);
436 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
437 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
438 goto end;
439 }
440 for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) {
441 struct lttng_live_trace *trace =
442 g_ptr_array_index(session->traces, trace_idx);
443
444 status = lttng_live_metadata_update(trace);
445 if (status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
446 status != LTTNG_LIVE_ITERATOR_STATUS_END) {
447 goto end;
448 }
449 }
450 status = lttng_live_lazy_msg_init(session);
451
452 end:
453 return status;
454 }
455
456 BT_HIDDEN
457 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
458 {
459 uint64_t session_idx;
460
461 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
462 session_idx++) {
463 struct lttng_live_session *session =
464 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
465 session->new_streams_needed = true;
466 }
467 }
468
469 static
470 void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter)
471 {
472 uint64_t session_idx, trace_idx;
473
474 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
475 session_idx++) {
476 struct lttng_live_session *session =
477 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
478 session->new_streams_needed = true;
479 for (trace_idx = 0; trace_idx < session->traces->len;
480 trace_idx++) {
481 struct lttng_live_trace *trace =
482 g_ptr_array_index(session->traces, trace_idx);
483 trace->new_metadata_needed = true;
484 }
485 }
486 }
487
488 static
489 enum lttng_live_iterator_status
490 lttng_live_iterator_handle_new_streams_and_metadata(
491 struct lttng_live_msg_iter *lttng_live_msg_iter)
492 {
493 enum lttng_live_iterator_status ret =
494 LTTNG_LIVE_ITERATOR_STATUS_OK;
495 uint64_t session_idx = 0, nr_sessions_opened = 0;
496 struct lttng_live_session *session;
497 enum session_not_found_action sess_not_found_act =
498 lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act;
499
500 /*
501 * In a remotely distant future, we could add a "new
502 * session" flag to the protocol, which would tell us that we
503 * need to query for new sessions even though we have sessions
504 * currently ongoing.
505 */
506 if (lttng_live_msg_iter->sessions->len == 0) {
507 if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
508 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
509 goto end;
510 } else {
511 /*
512 * Retry to create a viewer session for the requested
513 * session name.
514 */
515 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
516 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
517 goto end;
518 }
519 }
520 }
521
522 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
523 session_idx++) {
524 session = g_ptr_array_index(lttng_live_msg_iter->sessions,
525 session_idx);
526 ret = lttng_live_get_session(lttng_live_msg_iter, session);
527 switch (ret) {
528 case LTTNG_LIVE_ITERATOR_STATUS_OK:
529 break;
530 case LTTNG_LIVE_ITERATOR_STATUS_END:
531 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
532 break;
533 default:
534 goto end;
535 }
536 if (!session->closed) {
537 nr_sessions_opened++;
538 }
539 }
540 end:
541 if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
542 sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
543 nr_sessions_opened == 0) {
544 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
545 }
546 return ret;
547 }
548
549 static
550 enum lttng_live_iterator_status emit_inactivity_message(
551 struct lttng_live_msg_iter *lttng_live_msg_iter,
552 struct lttng_live_stream_iterator *stream_iter,
553 bt_message **message, uint64_t timestamp)
554 {
555 enum lttng_live_iterator_status ret =
556 LTTNG_LIVE_ITERATOR_STATUS_OK;
557 bt_message *msg = NULL;
558
559 BT_ASSERT(stream_iter->trace->clock_class);
560
561 msg = bt_message_message_iterator_inactivity_create(
562 lttng_live_msg_iter->self_msg_iter,
563 stream_iter->trace->clock_class,
564 timestamp);
565 if (!msg) {
566 goto error;
567 }
568
569 *message = msg;
570 end:
571 return ret;
572
573 error:
574 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
575 bt_message_put_ref(msg);
576 goto end;
577 }
578
579 static
580 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
581 struct lttng_live_msg_iter *lttng_live_msg_iter,
582 struct lttng_live_stream_iterator *lttng_live_stream,
583 bt_message **message)
584 {
585 enum lttng_live_iterator_status ret =
586 LTTNG_LIVE_ITERATOR_STATUS_OK;
587
588 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
589 return LTTNG_LIVE_ITERATOR_STATUS_OK;
590 }
591
592 if (lttng_live_stream->current_inactivity_ts ==
593 lttng_live_stream->last_inactivity_ts) {
594 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
595 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
596 goto end;
597 }
598
599 ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream,
600 message, lttng_live_stream->current_inactivity_ts);
601
602 lttng_live_stream->last_inactivity_ts =
603 lttng_live_stream->current_inactivity_ts;
604 end:
605 return ret;
606 }
607
608 static
609 int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter,
610 struct lttng_live_msg_iter *lttng_live_msg_iter,
611 const bt_message *msg, int64_t last_msg_ts_ns,
612 int64_t *ts_ns)
613 {
614 const bt_clock_class *clock_class = NULL;
615 const bt_clock_snapshot *clock_snapshot = NULL;
616 int ret = 0;
617 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
618
619 BT_ASSERT(msg);
620 BT_ASSERT(ts_ns);
621
622 BT_LOGD("Getting message's timestamp: iter-data-addr=%p, msg-addr=%p, "
623 "last-msg-ts=%" PRId64, lttng_live_msg_iter, msg,
624 last_msg_ts_ns);
625
626 switch (bt_message_get_type(msg)) {
627 case BT_MESSAGE_TYPE_EVENT:
628 clock_class =
629 bt_message_event_borrow_stream_class_default_clock_class_const(
630 msg);
631 BT_ASSERT(clock_class);
632
633 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
634 msg);
635 break;
636 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
637 clock_class =
638 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
639 msg);
640 BT_ASSERT(clock_class);
641
642 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
643 msg);
644 break;
645 case BT_MESSAGE_TYPE_PACKET_END:
646 clock_class =
647 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
648 msg);
649 BT_ASSERT(clock_class);
650
651 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
652 msg);
653 break;
654 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
655 clock_class =
656 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
657 msg);
658 BT_ASSERT(clock_class);
659
660 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
661 msg);
662 break;
663 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
664 clock_class =
665 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
666 msg);
667 BT_ASSERT(clock_class);
668
669 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
670 msg);
671 break;
672 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
673 clock_class =
674 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
675 msg);
676 BT_ASSERT(clock_class);
677
678 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
679 msg, &clock_snapshot);
680 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
681 goto no_clock_snapshot;
682 }
683
684 break;
685 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
686 clock_class =
687 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
688 msg);
689 BT_ASSERT(clock_class);
690
691 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
692 msg, &clock_snapshot);
693 if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
694 goto no_clock_snapshot;
695 }
696
697 break;
698 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
699 clock_snapshot =
700 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
701 msg);
702 break;
703 default:
704 /* All the other messages have a higher priority */
705 BT_LOGD_STR("Message has no timestamp: using the last message timestamp.");
706 *ts_ns = last_msg_ts_ns;
707 goto end;
708 }
709
710 clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
711 BT_ASSERT(clock_class);
712
713 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
714 if (ret) {
715 BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
716 "clock-snapshot-addr=%p", clock_snapshot);
717 goto error;
718 }
719
720 goto end;
721
722 no_clock_snapshot:
723 BT_LOGD_STR("Message's default clock snapshot is missing: "
724 "using the last message timestamp.");
725 *ts_ns = last_msg_ts_ns;
726 goto end;
727
728 error:
729 ret = -1;
730
731 end:
732 if (ret == 0) {
733 BT_LOGD("Found message's timestamp: "
734 "iter-data-addr=%p, msg-addr=%p, "
735 "last-msg-ts=%" PRId64 ", ts=%" PRId64,
736 lttng_live_msg_iter, msg, last_msg_ts_ns, *ts_ns);
737 }
738
739 return ret;
740 }
741
742 static
743 enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
744 struct lttng_live_msg_iter *lttng_live_msg_iter,
745 struct lttng_live_stream_iterator *lttng_live_stream,
746 bt_message **message)
747 {
748 enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
749 enum bt_msg_iter_status status;
750 uint64_t session_idx, trace_idx;
751
752 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
753 session_idx++) {
754 struct lttng_live_session *session =
755 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
756
757 if (session->new_streams_needed) {
758 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
759 goto end;
760 }
761 for (trace_idx = 0; trace_idx < session->traces->len;
762 trace_idx++) {
763 struct lttng_live_trace *trace =
764 g_ptr_array_index(session->traces, trace_idx);
765 if (trace->new_metadata_needed) {
766 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
767 goto end;
768 }
769 }
770 }
771
772 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
773 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
774 goto end;
775 }
776
777 status = bt_msg_iter_get_next_message(lttng_live_stream->msg_iter,
778 lttng_live_msg_iter->self_msg_iter, message);
779 switch (status) {
780 case BT_MSG_ITER_STATUS_EOF:
781 ret = LTTNG_LIVE_ITERATOR_STATUS_END;
782 break;
783 case BT_MSG_ITER_STATUS_OK:
784 ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
785 break;
786 case BT_MSG_ITER_STATUS_AGAIN:
787 /*
788 * Continue immediately (end of packet). The next
789 * get_index may return AGAIN to delay the following
790 * attempt.
791 */
792 ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
793 break;
794 case BT_MSG_ITER_STATUS_INVAL:
795 /* No argument provided by the user, so don't return INVAL. */
796 case BT_MSG_ITER_STATUS_ERROR:
797 default:
798 ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
799 BT_LOGW("CTF msg iterator return an error or failed msg_iter=%p",
800 lttng_live_stream->msg_iter);
801 break;
802 }
803
804 end:
805 return ret;
806 }
807
808 /*
809 * helper function:
810 * handle_no_data_streams()
811 * retry:
812 * - for each ACTIVE_NO_DATA stream:
813 * - query relayd for stream data, or quiescence info.
814 * - if need metadata, get metadata, goto retry.
815 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
816 * - if quiescent, move to QUIESCENT streams
817 * - if fetched data, move to ACTIVE_DATA streams
818 * (at this point each stream either has data, or is quiescent)
819 *
820 *
821 * iterator_next:
822 * handle_new_streams_and_metadata()
823 * - query relayd for known streams, add them as ACTIVE_NO_DATA
824 * - query relayd for metadata
825 *
826 * call handle_active_no_data_streams()
827 *
828 * handle_quiescent_streams()
829 * - if at least one stream is ACTIVE_DATA:
830 * - peek stream event with lowest timestamp -> next_ts
831 * - for each quiescent stream
832 * - if next_ts >= quiescent end
833 * - set state to ACTIVE_NO_DATA
834 * - else
835 * - for each quiescent stream
836 * - set state to ACTIVE_NO_DATA
837 *
838 * call handle_active_no_data_streams()
839 *
840 * handle_active_data_streams()
841 * - if at least one stream is ACTIVE_DATA:
842 * - get stream event with lowest timestamp from heap
843 * - make that stream event the current message.
844 * - move this stream heap position to its next event
845 * - if we need to fetch data from relayd, move
846 * stream to ACTIVE_NO_DATA.
847 * - return OK
848 * - return AGAIN
849 *
850 * end criterion: ctrl-c on client. If relayd exits or the session
851 * closes on the relay daemon side, we keep on waiting for streams.
852 * Eventually handle --end timestamp (also an end criterion).
853 *
854 * When disconnected from relayd: try to re-connect endlessly.
855 */
856 static
857 enum lttng_live_iterator_status lttng_live_iterator_next_on_stream(
858 struct lttng_live_msg_iter *lttng_live_msg_iter,
859 struct lttng_live_stream_iterator *stream_iter,
860 bt_message **curr_msg)
861 {
862 enum lttng_live_iterator_status live_status;
863
864 retry:
865 print_stream_state(stream_iter);
866 live_status = lttng_live_iterator_handle_new_streams_and_metadata(
867 lttng_live_msg_iter);
868 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
869 goto end;
870 }
871 live_status = lttng_live_iterator_next_handle_one_no_data_stream(
872 lttng_live_msg_iter, stream_iter);
873 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
874 goto end;
875 }
876 live_status = lttng_live_iterator_next_handle_one_quiescent_stream(
877 lttng_live_msg_iter, stream_iter, curr_msg);
878 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
879 BT_ASSERT(*curr_msg == NULL);
880 goto end;
881 }
882 if (*curr_msg) {
883 goto end;
884 }
885 live_status = lttng_live_iterator_next_handle_one_active_data_stream(
886 lttng_live_msg_iter, stream_iter, curr_msg);
887 if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
888 BT_ASSERT(*curr_msg == NULL);
889 }
890
891 end:
892 if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) {
893 goto retry;
894 }
895
896 return live_status;
897 }
898
899 static
900 enum lttng_live_iterator_status next_stream_iterator_for_trace(
901 struct lttng_live_msg_iter *lttng_live_msg_iter,
902 struct lttng_live_trace *live_trace,
903 struct lttng_live_stream_iterator **candidate_stream_iter)
904 {
905 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
906 enum lttng_live_iterator_status stream_iter_status;;
907 int64_t curr_candidate_msg_ts = INT64_MAX;
908 uint64_t stream_iter_idx;
909
910 BT_ASSERT(live_trace);
911 BT_ASSERT(live_trace->stream_iterators);
912 /*
913 * Update the current message of every stream iterators of this trace.
914 * The current msg of every stream must have a timestamp equal or
915 * larger than the last message returned by this iterator. We must
916 * ensure monotonicity.
917 */
918 stream_iter_idx = 0;
919 while (stream_iter_idx < live_trace->stream_iterators->len) {
920 bool stream_iter_is_ended = false;
921 struct lttng_live_stream_iterator *stream_iter =
922 g_ptr_array_index(live_trace->stream_iterators,
923 stream_iter_idx);
924
925 /*
926 * Find if there is are now current message for this stream
927 * iterator get it.
928 */
929 while (!stream_iter->current_msg) {
930 bt_message *msg = NULL;
931 int64_t curr_msg_ts_ns = INT64_MAX;
932 stream_iter_status = lttng_live_iterator_next_on_stream(
933 lttng_live_msg_iter, stream_iter, &msg);
934
935 BT_LOGD("live stream iterator returned status :%s",
936 print_live_iterator_status(stream_iter_status));
937 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
938 stream_iter_is_ended = true;
939 break;
940 }
941
942 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
943 goto end;
944 }
945
946 BT_ASSERT(msg);
947
948 /*
949 * Get the timestamp in nanoseconds from origin of this
950 * messsage.
951 */
952 live_get_msg_ts_ns(stream_iter, lttng_live_msg_iter,
953 msg, lttng_live_msg_iter->last_msg_ts_ns,
954 &curr_msg_ts_ns);
955
956 /*
957 * Check if the message of the current live stream
958 * iterator occured at the exact same time or after the
959 * last message returned by this component's message
960 * iterator. If not, we return an error.
961 */
962 if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) {
963 stream_iter->current_msg = msg;
964 stream_iter->current_msg_ts_ns = curr_msg_ts_ns;
965 } else {
966 /*
967 * We received a message in the past. To ensure
968 * monotonicity, we can't send it forward.
969 */
970 BT_LOGE("Message's timestamp is less than "
971 "lttng-live's message iterator's last "
972 "returned timestamp: "
973 "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", "
974 "last-msg-ts=%" PRId64,
975 lttng_live_msg_iter, curr_msg_ts_ns,
976 lttng_live_msg_iter->last_msg_ts_ns);
977 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
978 goto end;
979 }
980 }
981
982 if (!stream_iter_is_ended &&
983 stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
984 /*
985 * Update the current best candidate message for the
986 * stream iterator of thise live trace to be forwarded
987 * downstream.
988 */
989 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
990 curr_candidate_stream_iter = stream_iter;
991 }
992
993 if (stream_iter_is_ended) {
994 /*
995 * The live stream iterator is ENDed. We remove that
996 * iterator from the list and we restart the iteration
997 * at the beginning of the live stream iterator array
998 * to because the removal will shuffle the array.
999 */
1000 g_ptr_array_remove_index_fast(live_trace->stream_iterators,
1001 stream_iter_idx);
1002 stream_iter_idx = 0;
1003 } else {
1004 stream_iter_idx++;
1005 }
1006 }
1007
1008 if (curr_candidate_stream_iter) {
1009 *candidate_stream_iter = curr_candidate_stream_iter;
1010 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1011 } else {
1012 /*
1013 * The only case where we don't have a candidate for this trace
1014 * is if we reached the end of all the iterators.
1015 */
1016 BT_ASSERT(live_trace->stream_iterators->len == 0);
1017 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1018 }
1019
1020 end:
1021 return stream_iter_status;
1022 }
1023
1024 static
1025 enum lttng_live_iterator_status next_stream_iterator_for_session(
1026 struct lttng_live_msg_iter *lttng_live_msg_iter,
1027 struct lttng_live_session *session,
1028 struct lttng_live_stream_iterator **candidate_session_stream_iter)
1029 {
1030 enum lttng_live_iterator_status stream_iter_status;
1031 uint64_t trace_idx = 0;
1032 int64_t curr_candidate_msg_ts = INT64_MAX;
1033 struct lttng_live_stream_iterator *curr_candidate_stream_iter = NULL;
1034
1035 /*
1036 * Make sure we are attached to the session and look for new streams
1037 * and metadata.
1038 */
1039 stream_iter_status = lttng_live_get_session(lttng_live_msg_iter, session);
1040 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK &&
1041 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_CONTINUE &&
1042 stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_END) {
1043 goto end;
1044 }
1045
1046 BT_ASSERT(session->traces);
1047
1048 /*
1049 * Use while loops here rather then for loops so we can restart the
1050 * iteration if an element is removed from the array during the
1051 * looping.
1052 */
1053 while (trace_idx < session->traces->len) {
1054 bool trace_is_ended = false;
1055 struct lttng_live_stream_iterator *stream_iter;
1056 struct lttng_live_trace *trace =
1057 g_ptr_array_index(session->traces, trace_idx);
1058
1059 stream_iter_status = next_stream_iterator_for_trace(
1060 lttng_live_msg_iter, trace, &stream_iter);
1061 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1062 /*
1063 * All the live stream iterators for this trace are
1064 * ENDed. Remove the trace from this session.
1065 */
1066 trace_is_ended = true;
1067 } else if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1068 goto end;
1069 }
1070
1071 if (!trace_is_ended) {
1072 BT_ASSERT(stream_iter);
1073
1074 if (stream_iter->current_msg_ts_ns <= curr_candidate_msg_ts) {
1075 curr_candidate_msg_ts = stream_iter->current_msg_ts_ns;
1076 curr_candidate_stream_iter = stream_iter;
1077 }
1078 trace_idx++;
1079 } else {
1080 g_ptr_array_remove_index_fast(session->traces, trace_idx);
1081 trace_idx = 0;
1082 }
1083 }
1084 if (curr_candidate_stream_iter) {
1085 *candidate_session_stream_iter = curr_candidate_stream_iter;
1086 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1087 } else {
1088 /*
1089 * The only cases where we don't have a candidate for this
1090 * trace is:
1091 * 1. if we reached the end of all the iterators of all the
1092 * traces of this session,
1093 * 2. if we never had live stream iterator in the first place.
1094 *
1095 * In either cases, we return END.
1096 */
1097 BT_ASSERT(session->traces->len == 0);
1098 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_END;
1099 }
1100 end:
1101 return stream_iter_status;
1102 }
1103
1104 static inline
1105 void put_messages(bt_message_array_const msgs, uint64_t count)
1106 {
1107 uint64_t i;
1108
1109 for (i = 0; i < count; i++) {
1110 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
1111 }
1112 }
1113
1114 BT_HIDDEN
1115 bt_self_message_iterator_status lttng_live_msg_iter_next(
1116 bt_self_message_iterator *self_msg_it,
1117 bt_message_array_const msgs, uint64_t capacity,
1118 uint64_t *count)
1119 {
1120 bt_self_message_iterator_status status;
1121 struct lttng_live_msg_iter *lttng_live_msg_iter =
1122 bt_self_message_iterator_get_data(self_msg_it);
1123 struct lttng_live_component *lttng_live =
1124 lttng_live_msg_iter->lttng_live_comp;
1125 enum lttng_live_iterator_status stream_iter_status;
1126 uint64_t session_idx;
1127
1128 *count = 0;
1129
1130 BT_ASSERT(lttng_live_msg_iter);
1131
1132 /*
1133 * Clear all the invalid message reference that might be left over in
1134 * the output array.
1135 */
1136 memset(msgs, 0, capacity * sizeof(*msgs));
1137
1138 /*
1139 * If no session are exposed on the relay found at the url provided by
1140 * the user, session count will be 0. In this case, we return status
1141 * end to return gracefully.
1142 */
1143 if (lttng_live_msg_iter->sessions->len == 0) {
1144 if (lttng_live->params.sess_not_found_act !=
1145 SESSION_NOT_FOUND_ACTION_CONTINUE) {
1146 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1147 goto no_session;
1148 } else {
1149 /*
1150 * The are no more active session for this session
1151 * name. Retry to create a viewer session for the
1152 * requested session name.
1153 */
1154 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1155 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1156 goto no_session;
1157 }
1158 }
1159 }
1160
1161 if (lttng_live_msg_iter->active_stream_iter == 0) {
1162 lttng_live_force_new_streams_and_metadata(lttng_live_msg_iter);
1163 }
1164
1165 /*
1166 * Here the muxing of message is done.
1167 *
1168 * We need to iterate over all the streams of all the traces of all the
1169 * viewer sessions in order to get the message with the smallest
1170 * timestamp. In this case, a session is a viewer session and there is
1171 * one viewer session per consumer daemon. (UST 32bit, UST 64bit and/or
1172 * kernel). Each viewer session can have multiple traces, for example,
1173 * 64bit UST viewer sessions could have multiple per-pid traces.
1174 *
1175 * We iterate over the streams of each traces to update and see what is
1176 * their next message's timestamp. From those timestamps, we select the
1177 * message with the smallest timestamp as the best candidate message
1178 * for that trace and do the same thing across all the sessions.
1179 *
1180 * We then compare the timestamp of best candidate message of all the
1181 * sessions to pick the message with the smallest timestamp and we
1182 * return it.
1183 */
1184 while (*count < capacity) {
1185 struct lttng_live_stream_iterator *next_stream_iter = NULL,
1186 *candidate_stream_iter = NULL;
1187 int64_t next_msg_ts_ns = INT64_MAX;
1188
1189 BT_ASSERT(lttng_live_msg_iter->sessions);
1190 session_idx = 0;
1191 /*
1192 * Use a while loop instead of a for loop so we can restart the
1193 * iteration if we remove an element. We can safely call
1194 * next_stream_iterator_for_session() multiple times on the
1195 * same session as we only fetch a new message if there is no
1196 * current next message for each live stream iterator.
1197 * If all live stream iterator of that session already have a
1198 * current next message, the function will simply exit return
1199 * the same candidate live stream iterator every time.
1200 */
1201 while (session_idx < lttng_live_msg_iter->sessions->len) {
1202 struct lttng_live_session *session =
1203 g_ptr_array_index(lttng_live_msg_iter->sessions,
1204 session_idx);
1205
1206 /* Find the best candidate message to send downstream. */
1207 stream_iter_status = next_stream_iterator_for_session(
1208 lttng_live_msg_iter, session,
1209 &candidate_stream_iter);
1210
1211 /* If we receive an END status, it means that either:
1212 * - Those traces never had active streams (UST with no
1213 * data produced yet),
1214 * - All live stream iterators have ENDed.*/
1215 if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) {
1216 if (session->closed && session->traces->len == 0) {
1217 /*
1218 * Remove the session from the list and restart the
1219 * iteration at the beginning of the array since the
1220 * removal shuffle the elements of the array.
1221 */
1222 g_ptr_array_remove_index_fast(
1223 lttng_live_msg_iter->sessions,
1224 session_idx);
1225 session_idx = 0;
1226 } else {
1227 session_idx++;
1228 }
1229 continue;
1230 }
1231
1232 if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
1233 goto end;
1234 }
1235
1236 if (candidate_stream_iter->current_msg_ts_ns <= next_msg_ts_ns) {
1237 next_msg_ts_ns = candidate_stream_iter->current_msg_ts_ns;
1238 next_stream_iter = candidate_stream_iter;
1239 }
1240
1241 session_idx++;
1242 }
1243
1244 if (!next_stream_iter) {
1245 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1246 goto end;
1247 }
1248
1249 BT_ASSERT(next_stream_iter->current_msg);
1250 /* Ensure monotonicity. */
1251 BT_ASSERT(lttng_live_msg_iter->last_msg_ts_ns <=
1252 next_stream_iter->current_msg_ts_ns);
1253
1254 /*
1255 * Insert the next message to the message batch. This will set
1256 * stream iterator current messsage to NULL so that next time
1257 * we fetch the next message of that stream iterator
1258 */
1259 BT_MESSAGE_MOVE_REF(msgs[*count], next_stream_iter->current_msg);
1260 (*count)++;
1261
1262 /* Update the last timestamp in nanoseconds sent downstream. */
1263 lttng_live_msg_iter->last_msg_ts_ns = next_msg_ts_ns;
1264 next_stream_iter->current_msg_ts_ns = INT64_MAX;
1265
1266 stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1267 }
1268 end:
1269 switch (stream_iter_status) {
1270 case LTTNG_LIVE_ITERATOR_STATUS_OK:
1271 case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
1272 if (*count > 0) {
1273 /*
1274 * We received a again status but we have some messages
1275 * to send downstream. We send them and return OK for
1276 * now. On the next call we return again if there are
1277 * still no new message to send.
1278 */
1279 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1280 } else {
1281 status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
1282 }
1283 break;
1284 case LTTNG_LIVE_ITERATOR_STATUS_END:
1285 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1286 break;
1287 case LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
1288 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1289 break;
1290 case LTTNG_LIVE_ITERATOR_STATUS_ERROR:
1291 case LTTNG_LIVE_ITERATOR_STATUS_INVAL:
1292 case LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
1293 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1294 /* Put all existing messages on error. */
1295 put_messages(msgs, *count);
1296 break;
1297 default:
1298 abort();
1299 }
1300
1301 no_session:
1302 return status;
1303 }
1304
1305 BT_HIDDEN
1306 bt_self_message_iterator_status lttng_live_msg_iter_init(
1307 bt_self_message_iterator *self_msg_it,
1308 bt_self_component_source *self_comp_src,
1309 bt_self_component_port_output *self_port)
1310 {
1311 bt_self_message_iterator_status ret =
1312 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1313 bt_self_component *self_comp =
1314 bt_self_component_source_as_self_component(self_comp_src);
1315 struct lttng_live_component *lttng_live;
1316 struct lttng_live_msg_iter *lttng_live_msg_iter;
1317
1318 BT_ASSERT(self_msg_it);
1319
1320 lttng_live = bt_self_component_get_data(self_comp);
1321
1322 /* There can be only one downstream iterator at the same time. */
1323 BT_ASSERT(!lttng_live->has_msg_iter);
1324 lttng_live->has_msg_iter = true;
1325
1326 lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
1327 if (!lttng_live_msg_iter) {
1328 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1329 goto end;
1330 }
1331
1332 lttng_live_msg_iter->lttng_live_comp = lttng_live;
1333 lttng_live_msg_iter->self_msg_iter = self_msg_it;
1334
1335 lttng_live_msg_iter->active_stream_iter = 0;
1336 lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
1337 lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
1338 (GDestroyNotify) lttng_live_destroy_session);
1339 BT_ASSERT(lttng_live_msg_iter->sessions);
1340
1341 lttng_live_msg_iter->viewer_connection =
1342 live_viewer_connection_create(lttng_live->params.url->str, false,
1343 lttng_live_msg_iter);
1344 if (!lttng_live_msg_iter->viewer_connection) {
1345 goto error;
1346 }
1347
1348 if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
1349 goto error;
1350 }
1351 if (lttng_live_msg_iter->sessions->len == 0) {
1352 switch (lttng_live->params.sess_not_found_act) {
1353 case SESSION_NOT_FOUND_ACTION_CONTINUE:
1354 BT_LOGI("Unable to connect to the requested live viewer "
1355 "session. Keep trying to connect because of "
1356 "%s=\"%s\" component parameter: url=\"%s\"",
1357 SESS_NOT_FOUND_ACTION_PARAM,
1358 SESS_NOT_FOUND_ACTION_CONTINUE_STR,
1359 lttng_live->params.url->str);
1360 break;
1361 case SESSION_NOT_FOUND_ACTION_FAIL:
1362 BT_LOGE("Unable to connect to the requested live viewer "
1363 "session. Fail the message iterator"
1364 "initialization because of %s=\"%s\" "
1365 "component parameter: url =\"%s\"",
1366 SESS_NOT_FOUND_ACTION_PARAM,
1367 SESS_NOT_FOUND_ACTION_FAIL_STR,
1368 lttng_live->params.url->str);
1369 goto error;
1370 case SESSION_NOT_FOUND_ACTION_END:
1371 BT_LOGI("Unable to connect to the requested live viewer "
1372 "session. End gracefully at the first _next() "
1373 "call because of %s=\"%s\" component parameter: "
1374 "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1375 SESS_NOT_FOUND_ACTION_END_STR,
1376 lttng_live->params.url->str);
1377 break;
1378 }
1379 }
1380
1381 bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
1382
1383 goto end;
1384 error:
1385 ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1386 lttng_live_msg_iter_destroy(lttng_live_msg_iter);
1387 end:
1388 return ret;
1389 }
1390
1391 static
1392 bt_query_status lttng_live_query_list_sessions(const bt_value *params,
1393 const bt_value **result)
1394 {
1395 bt_query_status status = BT_QUERY_STATUS_OK;
1396 const bt_value *url_value = NULL;
1397 const char *url;
1398 struct live_viewer_connection *viewer_connection = NULL;
1399
1400 url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1401 if (!url_value) {
1402 BT_LOGW("Mandatory `%s` parameter missing", URL_PARAM);
1403 status = BT_QUERY_STATUS_INVALID_PARAMS;
1404 goto error;
1405 }
1406
1407 if (!bt_value_is_string(url_value)) {
1408 BT_LOGW("`%s` parameter is required to be a string value",
1409 URL_PARAM);
1410 status = BT_QUERY_STATUS_INVALID_PARAMS;
1411 goto error;
1412 }
1413
1414 url = bt_value_string_get(url_value);
1415
1416 viewer_connection = live_viewer_connection_create(url, true, NULL);
1417 if (!viewer_connection) {
1418 goto error;
1419 }
1420
1421 status = live_viewer_connection_list_sessions(viewer_connection,
1422 result);
1423 if (status != BT_QUERY_STATUS_OK) {
1424 goto error;
1425 }
1426
1427 goto end;
1428
1429 error:
1430 BT_VALUE_PUT_REF_AND_RESET(*result);
1431
1432 if (status >= 0) {
1433 status = BT_QUERY_STATUS_ERROR;
1434 }
1435
1436 end:
1437 if (viewer_connection) {
1438 live_viewer_connection_destroy(viewer_connection);
1439 }
1440 return status;
1441 }
1442
1443 BT_HIDDEN
1444 bt_query_status lttng_live_query(bt_self_component_class_source *comp_class,
1445 const bt_query_executor *query_exec,
1446 const char *object, const bt_value *params,
1447 const bt_value **result)
1448 {
1449 bt_query_status status = BT_QUERY_STATUS_OK;
1450
1451 if (strcmp(object, "sessions") == 0) {
1452 status = lttng_live_query_list_sessions(params, result);
1453 } else {
1454 BT_LOGW("Unknown query object `%s`", object);
1455 status = BT_QUERY_STATUS_INVALID_OBJECT;
1456 goto end;
1457 }
1458
1459 end:
1460 return status;
1461 }
1462
1463 static
1464 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1465 {
1466 if (!lttng_live) {
1467 return;
1468 }
1469 if (lttng_live->params.url) {
1470 g_string_free(lttng_live->params.url, TRUE);
1471 }
1472 g_free(lttng_live);
1473 }
1474
1475 BT_HIDDEN
1476 void lttng_live_component_finalize(bt_self_component_source *component)
1477 {
1478 void *data = bt_self_component_get_data(
1479 bt_self_component_source_as_self_component(component));
1480
1481 if (!data) {
1482 return;
1483 }
1484 lttng_live_component_destroy_data(data);
1485 }
1486
1487 static
1488 enum session_not_found_action parse_session_not_found_action_param(
1489 const bt_value *no_session_param)
1490 {
1491 enum session_not_found_action action;
1492 const char *no_session_act_str;
1493 no_session_act_str = bt_value_string_get(no_session_param);
1494 if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_CONTINUE_STR) == 0) {
1495 action = SESSION_NOT_FOUND_ACTION_CONTINUE;
1496 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_FAIL_STR) == 0) {
1497 action = SESSION_NOT_FOUND_ACTION_FAIL;
1498 } else if (strcmp(no_session_act_str, SESS_NOT_FOUND_ACTION_END_STR) == 0) {
1499 action = SESSION_NOT_FOUND_ACTION_END;
1500 } else {
1501 action = -1;
1502 }
1503
1504 return action;
1505 }
1506
1507 struct lttng_live_component *lttng_live_component_create(const bt_value *params)
1508 {
1509 struct lttng_live_component *lttng_live;
1510 const bt_value *value = NULL;
1511 const char *url;
1512
1513 lttng_live = g_new0(struct lttng_live_component, 1);
1514 if (!lttng_live) {
1515 goto end;
1516 }
1517 lttng_live->max_query_size = MAX_QUERY_SIZE;
1518 lttng_live->has_msg_iter = false;
1519
1520 value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
1521 if (!value || !bt_value_is_string(value)) {
1522 BT_LOGW("Mandatory `%s` parameter missing or not a string",
1523 URL_PARAM);
1524 goto error;
1525 }
1526 url = bt_value_string_get(value);
1527 lttng_live->params.url = g_string_new(url);
1528 if (!lttng_live->params.url) {
1529 goto error;
1530 }
1531
1532 value = bt_value_map_borrow_entry_value_const(params,
1533 SESS_NOT_FOUND_ACTION_PARAM);
1534
1535 if (value && bt_value_is_string(value)) {
1536 lttng_live->params.sess_not_found_act =
1537 parse_session_not_found_action_param(value);
1538 if (lttng_live->params.sess_not_found_act == -1) {
1539 BT_LOGE("Unexpected value for `%s` parameter: "
1540 "value=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
1541 bt_value_string_get(value));
1542 goto error;
1543 }
1544 } else {
1545 BT_LOGW("Optional `%s` parameter is missing or "
1546 "not a string value. Defaulting to %s=\"%s\".",
1547 SESS_NOT_FOUND_ACTION_PARAM,
1548 SESS_NOT_FOUND_ACTION_PARAM,
1549 SESS_NOT_FOUND_ACTION_CONTINUE_STR);
1550 lttng_live->params.sess_not_found_act =
1551 SESSION_NOT_FOUND_ACTION_CONTINUE;
1552 }
1553
1554 goto end;
1555
1556 error:
1557 lttng_live_component_destroy_data(lttng_live);
1558 lttng_live = NULL;
1559 end:
1560 return lttng_live;
1561 }
1562
1563 BT_HIDDEN
1564 bt_self_component_status lttng_live_component_init(
1565 bt_self_component_source *self_comp,
1566 const bt_value *params, __attribute__((unused)) void *init_method_data)
1567 {
1568 struct lttng_live_component *lttng_live;
1569 bt_self_component_status ret = BT_SELF_COMPONENT_STATUS_OK;
1570
1571 lttng_live = lttng_live_component_create(params);
1572 if (!lttng_live) {
1573 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
1574 goto error;
1575 }
1576 lttng_live->self_comp = self_comp;
1577
1578 if (lttng_live_graph_is_canceled(lttng_live)) {
1579 ret = BT_SELF_COMPONENT_STATUS_END;
1580 goto error;
1581 }
1582
1583 ret = bt_self_component_source_add_output_port(
1584 lttng_live->self_comp, "out",
1585 NULL, NULL);
1586 if (ret != BT_SELF_COMPONENT_STATUS_OK) {
1587 goto error;
1588 }
1589
1590 bt_self_component_set_data(
1591 bt_self_component_source_as_self_component(self_comp),
1592 lttng_live);
1593 goto end;
1594
1595 error:
1596 lttng_live_component_destroy_data(lttng_live);
1597 lttng_live = NULL;
1598 end:
1599 return ret;
1600 }
This page took 0.062903 seconds and 3 git commands to generate.