lib: make trace IR API const-correct
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31 #include "logging.h"
32
33 #include <babeltrace/babeltrace.h>
34 #include <babeltrace/compiler-internal.h>
35 #include <babeltrace/types.h>
36 #include <inttypes.h>
37 #include <glib.h>
38 #include <babeltrace/assert-internal.h>
39 #include <unistd.h>
40 #include <plugins-common.h>
41
42 #include "data-stream.h"
43 #include "metadata.h"
44 #include "lttng-live-internal.h"
45
46 #define MAX_QUERY_SIZE (256*1024)
47
48 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
49
50 static const char *print_state(struct lttng_live_stream_iterator *s)
51 {
52 switch (s->state) {
53 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
54 return "ACTIVE_NO_DATA";
55 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
56 return "QUIESCENT_NO_DATA";
57 case LTTNG_LIVE_STREAM_QUIESCENT:
58 return "QUIESCENT";
59 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
60 return "ACTIVE_DATA";
61 case LTTNG_LIVE_STREAM_EOF:
62 return "EOF";
63 default:
64 return "ERROR";
65 }
66 }
67
68 static
69 void print_stream_state(struct lttng_live_stream_iterator *stream)
70 {
71 struct bt_port *port;
72
73 port = bt_port_from_private(stream->port);
74 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
75 bt_port_get_name(port),
76 print_state(stream),
77 stream->last_returned_inactivity_timestamp,
78 stream->current_inactivity_timestamp);
79 bt_object_put_ref(port);
80 }
81
82 BT_HIDDEN
83 bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
84 {
85 struct bt_component *component;
86 struct bt_graph *graph;
87 bt_bool ret;
88
89 if (!lttng_live) {
90 return BT_FALSE;
91 }
92
93 component = bt_component_from_private(lttng_live->private_component);
94 graph = bt_component_get_graph(component);
95 ret = bt_private_graph_is_canceled(graph);
96 bt_object_put_ref(graph);
97 bt_object_put_ref(component);
98 return ret;
99 }
100
101 BT_HIDDEN
102 int lttng_live_add_port(struct lttng_live_component *lttng_live,
103 struct lttng_live_stream_iterator *stream_iter)
104 {
105 int ret;
106 struct bt_private_port *private_port;
107 char name[STREAM_NAME_MAX_LEN];
108 enum bt_component_status status;
109
110 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
111 BT_ASSERT(ret > 0);
112 strcpy(stream_iter->name, name);
113 if (lttng_live_is_canceled(lttng_live)) {
114 return 0;
115 }
116 status = bt_self_component_source_add_output_port(
117 lttng_live->private_component, name, stream_iter,
118 &private_port);
119 switch (status) {
120 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
121 return 0;
122 case BT_COMPONENT_STATUS_OK:
123 break;
124 default:
125 return -1;
126 }
127 bt_object_put_ref(private_port); /* weak */
128 BT_LOGI("Added port %s", name);
129
130 if (lttng_live->no_stream_port) {
131 bt_object_get_ref(lttng_live->no_stream_port);
132 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
133 bt_object_put_ref(lttng_live->no_stream_port);
134 if (ret) {
135 return -1;
136 }
137 lttng_live->no_stream_port = NULL;
138 lttng_live->no_stream_iter->port = NULL;
139 }
140 stream_iter->port = private_port;
141 return 0;
142 }
143
144 BT_HIDDEN
145 int lttng_live_remove_port(struct lttng_live_component *lttng_live,
146 struct bt_private_port *port)
147 {
148 struct bt_component *component;
149 int64_t nr_ports;
150 int ret;
151
152 component = bt_component_from_private(lttng_live->private_component);
153 nr_ports = bt_component_source_get_output_port_count(component);
154 if (nr_ports < 0) {
155 return -1;
156 }
157 BT_OBJECT_PUT_REF_AND_RESET(component);
158 if (nr_ports == 1) {
159 enum bt_component_status status;
160
161 BT_ASSERT(!lttng_live->no_stream_port);
162
163 if (lttng_live_is_canceled(lttng_live)) {
164 return 0;
165 }
166 status = bt_self_component_source_add_output_port(lttng_live->private_component,
167 "no-stream", lttng_live->no_stream_iter,
168 &lttng_live->no_stream_port);
169 switch (status) {
170 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
171 return 0;
172 case BT_COMPONENT_STATUS_OK:
173 break;
174 default:
175 return -1;
176 }
177 bt_object_put_ref(lttng_live->no_stream_port); /* weak */
178 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
179 }
180 bt_object_get_ref(port);
181 ret = bt_private_port_remove_from_component(port);
182 bt_object_put_ref(port);
183 if (ret) {
184 return -1;
185 }
186 return 0;
187 }
188
189 static
190 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
191 uint64_t trace_id)
192 {
193 struct lttng_live_trace *trace;
194
195 bt_list_for_each_entry(trace, &session->traces, node) {
196 if (trace->id == trace_id) {
197 return trace;
198 }
199 }
200 return NULL;
201 }
202
203 static
204 void lttng_live_destroy_trace(struct bt_object *obj)
205 {
206 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
207
208 BT_LOGI("Destroy trace");
209 BT_ASSERT(bt_list_empty(&trace->streams));
210 bt_list_del(&trace->node);
211
212 if (trace->trace) {
213 int retval;
214
215 retval = bt_trace_set_is_static(trace->trace);
216 BT_ASSERT(!retval);
217 BT_OBJECT_PUT_REF_AND_RESET(trace->trace);
218 }
219 lttng_live_metadata_fini(trace);
220 BT_OBJECT_PUT_REF_AND_RESET(trace->cc_prio_map);
221 g_free(trace);
222 }
223
224 static
225 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
226 uint64_t trace_id)
227 {
228 struct lttng_live_trace *trace = NULL;
229
230 trace = g_new0(struct lttng_live_trace, 1);
231 if (!trace) {
232 goto error;
233 }
234 trace->session = session;
235 trace->id = trace_id;
236 BT_INIT_LIST_HEAD(&trace->streams);
237 trace->new_metadata_needed = true;
238 bt_list_add(&trace->node, &session->traces);
239 bt_object_init(&trace->obj, lttng_live_destroy_trace);
240 BT_LOGI("Create trace");
241 goto end;
242 error:
243 g_free(trace);
244 trace = NULL;
245 end:
246 return trace;
247 }
248
249 BT_HIDDEN
250 struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
251 uint64_t trace_id)
252 {
253 struct lttng_live_trace *trace;
254
255 trace = lttng_live_find_trace(session, trace_id);
256 if (trace) {
257 bt_object_get_ref(trace);
258 return trace;
259 }
260 return lttng_live_create_trace(session, trace_id);
261 }
262
263 BT_HIDDEN
264 void lttng_live_unref_trace(struct lttng_live_trace *trace)
265 {
266 bt_object_put_ref(trace);
267 }
268
269 static
270 void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
271 {
272 struct lttng_live_stream_iterator *stream, *s;
273
274 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
275 lttng_live_stream_iterator_destroy(stream);
276 }
277 lttng_live_metadata_fini(trace);
278 }
279
280 BT_HIDDEN
281 int lttng_live_add_session(struct lttng_live_component *lttng_live,
282 uint64_t session_id, const char *hostname,
283 const char *session_name)
284 {
285 int ret = 0;
286 struct lttng_live_session *s;
287
288 s = g_new0(struct lttng_live_session, 1);
289 if (!s) {
290 goto error;
291 }
292
293 s->id = session_id;
294 BT_INIT_LIST_HEAD(&s->traces);
295 s->lttng_live = lttng_live;
296 s->new_streams_needed = true;
297 s->hostname = g_string_new(hostname);
298 s->session_name = g_string_new(session_name);
299
300 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
301 s->id, hostname, session_name);
302 bt_list_add(&s->node, &lttng_live->sessions);
303 goto end;
304 error:
305 BT_LOGE("Error adding session");
306 g_free(s);
307 ret = -1;
308 end:
309 return ret;
310 }
311
312 static
313 void lttng_live_destroy_session(struct lttng_live_session *session)
314 {
315 struct lttng_live_trace *trace, *t;
316
317 BT_LOGI("Destroy session");
318 if (session->id != -1ULL) {
319 if (lttng_live_detach_session(session)) {
320 if (!lttng_live_is_canceled(session->lttng_live)) {
321 /* Old relayd cannot detach sessions. */
322 BT_LOGD("Unable to detach session %" PRIu64,
323 session->id);
324 }
325 }
326 session->id = -1ULL;
327 }
328 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
329 lttng_live_close_trace_streams(trace);
330 }
331 bt_list_del(&session->node);
332 if (session->hostname) {
333 g_string_free(session->hostname, TRUE);
334 }
335 if (session->session_name) {
336 g_string_free(session->session_name, TRUE);
337 }
338 g_free(session);
339 }
340
341 BT_HIDDEN
342 void lttng_live_iterator_finalize(struct bt_self_notification_iterator *it)
343 {
344 struct lttng_live_stream_iterator_generic *s =
345 bt_self_notification_iterator_get_user_data(it);
346
347 switch (s->type) {
348 case LIVE_STREAM_TYPE_NO_STREAM:
349 {
350 /* Leave no_stream_iter in place when port is removed. */
351 break;
352 }
353 case LIVE_STREAM_TYPE_STREAM:
354 {
355 struct lttng_live_stream_iterator *stream_iter =
356 container_of(s, struct lttng_live_stream_iterator, p);
357
358 lttng_live_stream_iterator_destroy(stream_iter);
359 break;
360 }
361 }
362 }
363
364 static
365 enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
366 struct lttng_live_component *lttng_live,
367 struct lttng_live_stream_iterator *lttng_live_stream)
368 {
369 switch (lttng_live_stream->state) {
370 case LTTNG_LIVE_STREAM_QUIESCENT:
371 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
372 break;
373 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
374 /* Invalid state. */
375 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
376 abort();
377 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
378 /* Invalid state. */
379 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
380 abort();
381 case LTTNG_LIVE_STREAM_EOF:
382 break;
383 }
384 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
385 }
386
387 /*
388 * For active no data stream, fetch next data. It can be either:
389 * - quiescent: need to put it in the prio heap at quiescent end
390 * timestamp,
391 * - have data: need to wire up first event into the prio heap,
392 * - have no data on this stream at this point: need to retry (AGAIN) or
393 * return EOF.
394 */
395 static
396 enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
397 struct lttng_live_component *lttng_live,
398 struct lttng_live_stream_iterator *lttng_live_stream)
399 {
400 enum bt_lttng_live_iterator_status ret =
401 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
402 struct packet_index index;
403 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
404
405 if (lttng_live_stream->trace->new_metadata_needed) {
406 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
407 goto end;
408 }
409 if (lttng_live_stream->trace->session->new_streams_needed) {
410 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
411 goto end;
412 }
413 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
414 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
415 goto end;
416 }
417 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
418 if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
419 goto end;
420 }
421 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
422 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
423 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
424 && lttng_live_stream->last_returned_inactivity_timestamp ==
425 lttng_live_stream->current_inactivity_timestamp) {
426 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
427 print_stream_state(lttng_live_stream);
428 } else {
429 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
430 }
431 goto end;
432 }
433 lttng_live_stream->base_offset = index.offset;
434 lttng_live_stream->offset = index.offset;
435 lttng_live_stream->len = index.packet_size / CHAR_BIT;
436 end:
437 if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
438 ret = lttng_live_iterator_next_check_stream_state(
439 lttng_live, lttng_live_stream);
440 }
441 return ret;
442 }
443
444 /*
445 * Creation of the notification requires the ctf trace to be created
446 * beforehand, but the live protocol gives us all streams (including
447 * metadata) at once. So we split it in three steps: getting streams,
448 * getting metadata (which creates the ctf trace), and then creating the
449 * per-stream notifications.
450 */
451 static
452 enum bt_lttng_live_iterator_status lttng_live_get_session(
453 struct lttng_live_component *lttng_live,
454 struct lttng_live_session *session)
455 {
456 enum bt_lttng_live_iterator_status status;
457 struct lttng_live_trace *trace, *t;
458
459 if (lttng_live_attach_session(session)) {
460 if (lttng_live_is_canceled(lttng_live)) {
461 return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
462 } else {
463 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
464 }
465 }
466 status = lttng_live_get_new_streams(session);
467 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
468 status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
469 return status;
470 }
471 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
472 status = lttng_live_metadata_update(trace);
473 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
474 status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
475 return status;
476 }
477 }
478 return lttng_live_lazy_notif_init(session);
479 }
480
481 BT_HIDDEN
482 void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
483 {
484 struct lttng_live_session *session;
485
486 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
487 session->new_streams_needed = true;
488 }
489 }
490
491 static
492 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
493 {
494 struct lttng_live_session *session;
495
496 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
497 struct lttng_live_trace *trace;
498
499 session->new_streams_needed = true;
500 bt_list_for_each_entry(trace, &session->traces, node) {
501 trace->new_metadata_needed = true;
502 }
503 }
504 }
505
506 static
507 enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
508 struct lttng_live_component *lttng_live)
509 {
510 enum bt_lttng_live_iterator_status ret =
511 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
512 unsigned int nr_sessions_opened = 0;
513 struct lttng_live_session *session, *s;
514
515 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
516 if (session->closed && bt_list_empty(&session->traces)) {
517 lttng_live_destroy_session(session);
518 }
519 }
520 /*
521 * Currently, when there are no sessions, we quit immediately.
522 * We may want to add a component parameter to keep trying until
523 * we get data in the future.
524 * Also, in a remotely distant future, we could add a "new
525 * session" flag to the protocol, which would tell us that we
526 * need to query for new sessions even though we have sessions
527 * currently ongoing.
528 */
529 if (bt_list_empty(&lttng_live->sessions)) {
530 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
531 goto end;
532 }
533 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
534 ret = lttng_live_get_session(lttng_live, session);
535 switch (ret) {
536 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
537 break;
538 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
539 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
540 break;
541 default:
542 goto end;
543 }
544 if (!session->closed) {
545 nr_sessions_opened++;
546 }
547 }
548 end:
549 if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
550 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
551 }
552 return ret;
553 }
554
555 static
556 enum bt_lttng_live_iterator_status emit_inactivity_notification(
557 struct lttng_live_component *lttng_live,
558 struct lttng_live_stream_iterator *lttng_live_stream,
559 struct bt_notification **notification,
560 uint64_t timestamp)
561 {
562 enum bt_lttng_live_iterator_status ret =
563 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
564 struct lttng_live_trace *trace;
565 const struct bt_clock_class *clock_class = NULL;
566 struct bt_clock_value *clock_value = NULL;
567 struct bt_notification *notif = NULL;
568 int retval;
569
570 trace = lttng_live_stream->trace;
571 if (!trace) {
572 goto error;
573 }
574 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
575 if (!clock_class) {
576 goto error;
577 }
578 clock_value = bt_clock_value_create(clock_class, timestamp);
579 if (!clock_value) {
580 goto error;
581 }
582 notif = bt_notification_inactivity_create(trace->cc_prio_map);
583 if (!notif) {
584 goto error;
585 }
586 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
587 if (retval) {
588 goto error;
589 }
590 *notification = notif;
591 end:
592 bt_object_put_ref(clock_value);
593 bt_object_put_ref(clock_class);
594 return ret;
595
596 error:
597 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
598 bt_object_put_ref(notif);
599 goto end;
600 }
601
602 static
603 enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
604 struct lttng_live_component *lttng_live,
605 struct lttng_live_stream_iterator *lttng_live_stream,
606 struct bt_notification **notification)
607 {
608 enum bt_lttng_live_iterator_status ret =
609 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
610 const struct bt_clock_class *clock_class = NULL;
611 struct bt_clock_value *clock_value = NULL;
612
613 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
614 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
615 }
616
617 if (lttng_live_stream->current_inactivity_timestamp ==
618 lttng_live_stream->last_returned_inactivity_timestamp) {
619 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
620 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
621 goto end;
622 }
623
624 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
625 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
626
627 lttng_live_stream->last_returned_inactivity_timestamp =
628 lttng_live_stream->current_inactivity_timestamp;
629 end:
630 bt_object_put_ref(clock_value);
631 bt_object_put_ref(clock_class);
632 return ret;
633 }
634
635 static
636 enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
637 struct lttng_live_component *lttng_live,
638 struct lttng_live_stream_iterator *lttng_live_stream,
639 struct bt_notification **notification)
640 {
641 enum bt_lttng_live_iterator_status ret =
642 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
643 enum bt_notif_iter_status status;
644 struct lttng_live_session *session;
645
646 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
647 struct lttng_live_trace *trace;
648
649 if (session->new_streams_needed) {
650 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
651 }
652 bt_list_for_each_entry(trace, &session->traces, node) {
653 if (trace->new_metadata_needed) {
654 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
655 }
656 }
657 }
658
659 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
660 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
661 }
662 if (lttng_live_stream->packet_end_notif_queue) {
663 *notification = lttng_live_stream->packet_end_notif_queue;
664 lttng_live_stream->packet_end_notif_queue = NULL;
665 status = BT_NOTIF_ITER_STATUS_OK;
666 } else {
667 status = bt_notif_iter_get_next_notification(
668 lttng_live_stream->notif_iter,
669 lttng_live_stream->trace->cc_prio_map,
670 notification);
671 if (status == BT_NOTIF_ITER_STATUS_OK) {
672 /*
673 * Consider empty packets as inactivity.
674 */
675 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
676 lttng_live_stream->packet_end_notif_queue = *notification;
677 *notification = NULL;
678 return emit_inactivity_notification(lttng_live,
679 lttng_live_stream, notification,
680 lttng_live_stream->current_packet_end_timestamp);
681 }
682 }
683 }
684 switch (status) {
685 case BT_NOTIF_ITER_STATUS_EOF:
686 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
687 break;
688 case BT_NOTIF_ITER_STATUS_OK:
689 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
690 break;
691 case BT_NOTIF_ITER_STATUS_AGAIN:
692 /*
693 * Continue immediately (end of packet). The next
694 * get_index may return AGAIN to delay the following
695 * attempt.
696 */
697 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
698 break;
699 case BT_NOTIF_ITER_STATUS_INVAL:
700 /* No argument provided by the user, so don't return INVAL. */
701 case BT_NOTIF_ITER_STATUS_ERROR:
702 default:
703 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
704 break;
705 }
706 return ret;
707 }
708
709 /*
710 * helper function:
711 * handle_no_data_streams()
712 * retry:
713 * - for each ACTIVE_NO_DATA stream:
714 * - query relayd for stream data, or quiescence info.
715 * - if need metadata, get metadata, goto retry.
716 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
717 * - if quiescent, move to QUIESCENT streams
718 * - if fetched data, move to ACTIVE_DATA streams
719 * (at this point each stream either has data, or is quiescent)
720 *
721 *
722 * iterator_next:
723 * handle_new_streams_and_metadata()
724 * - query relayd for known streams, add them as ACTIVE_NO_DATA
725 * - query relayd for metadata
726 *
727 * call handle_active_no_data_streams()
728 *
729 * handle_quiescent_streams()
730 * - if at least one stream is ACTIVE_DATA:
731 * - peek stream event with lowest timestamp -> next_ts
732 * - for each quiescent stream
733 * - if next_ts >= quiescent end
734 * - set state to ACTIVE_NO_DATA
735 * - else
736 * - for each quiescent stream
737 * - set state to ACTIVE_NO_DATA
738 *
739 * call handle_active_no_data_streams()
740 *
741 * handle_active_data_streams()
742 * - if at least one stream is ACTIVE_DATA:
743 * - get stream event with lowest timestamp from heap
744 * - make that stream event the current notification.
745 * - move this stream heap position to its next event
746 * - if we need to fetch data from relayd, move
747 * stream to ACTIVE_NO_DATA.
748 * - return OK
749 * - return AGAIN
750 *
751 * end criterion: ctrl-c on client. If relayd exits or the session
752 * closes on the relay daemon side, we keep on waiting for streams.
753 * Eventually handle --end timestamp (also an end criterion).
754 *
755 * When disconnected from relayd: try to re-connect endlessly.
756 */
757 static
758 struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream(
759 struct bt_self_notification_iterator *iterator,
760 struct lttng_live_stream_iterator *stream_iter)
761 {
762 enum bt_lttng_live_iterator_status status;
763 struct bt_notification_iterator_next_method_return next_return;
764 struct lttng_live_component *lttng_live;
765
766 lttng_live = stream_iter->trace->session->lttng_live;
767 retry:
768 print_stream_state(stream_iter);
769 next_return.notification = NULL;
770 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
771 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
772 goto end;
773 }
774 status = lttng_live_iterator_next_handle_one_no_data_stream(
775 lttng_live, stream_iter);
776 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
777 goto end;
778 }
779 status = lttng_live_iterator_next_handle_one_quiescent_stream(
780 lttng_live, stream_iter, &next_return.notification);
781 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
782 BT_ASSERT(next_return.notification == NULL);
783 goto end;
784 }
785 if (next_return.notification) {
786 goto end;
787 }
788 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
789 stream_iter, &next_return.notification);
790 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
791 BT_ASSERT(next_return.notification == NULL);
792 }
793
794 end:
795 switch (status) {
796 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
797 print_dbg("continue");
798 goto retry;
799 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
800 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
801 print_dbg("again");
802 break;
803 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
805 print_dbg("end");
806 break;
807 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
808 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
809 print_dbg("ok");
810 break;
811 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
812 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
813 break;
814 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
815 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
816 break;
817 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
818 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
819 break;
820 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
821 default: /* fall-through */
822 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
823 break;
824 }
825 return next_return;
826 }
827
828 static
829 struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream(
830 struct bt_self_notification_iterator *iterator,
831 struct lttng_live_no_stream_iterator *no_stream_iter)
832 {
833 enum bt_lttng_live_iterator_status status;
834 struct bt_notification_iterator_next_method_return next_return;
835 struct lttng_live_component *lttng_live;
836
837 lttng_live = no_stream_iter->lttng_live;
838 retry:
839 lttng_live_force_new_streams_and_metadata(lttng_live);
840 next_return.notification = NULL;
841 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
842 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
843 goto end;
844 }
845 if (no_stream_iter->port) {
846 status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
847 } else {
848 status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
849 }
850 end:
851 switch (status) {
852 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
853 goto retry;
854 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
855 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
856 break;
857 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
858 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
859 break;
860 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
861 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
862 break;
863 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
864 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
865 break;
866 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
867 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
868 break;
869 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
870 default: /* fall-through */
871 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
872 break;
873 }
874 return next_return;
875 }
876
877 BT_HIDDEN
878 struct bt_notification_iterator_next_method_return lttng_live_iterator_next(
879 struct bt_self_notification_iterator *iterator)
880 {
881 struct lttng_live_stream_iterator_generic *s =
882 bt_self_notification_iterator_get_user_data(iterator);
883 struct bt_notification_iterator_next_method_return next_return;
884
885 switch (s->type) {
886 case LIVE_STREAM_TYPE_NO_STREAM:
887 next_return = lttng_live_iterator_next_no_stream(iterator,
888 container_of(s, struct lttng_live_no_stream_iterator, p));
889 break;
890 case LIVE_STREAM_TYPE_STREAM:
891 next_return = lttng_live_iterator_next_stream(iterator,
892 container_of(s, struct lttng_live_stream_iterator, p));
893 break;
894 default:
895 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
896 break;
897 }
898 return next_return;
899 }
900
901 BT_HIDDEN
902 enum bt_notification_iterator_status lttng_live_iterator_init(
903 struct bt_self_notification_iterator *it,
904 struct bt_private_port *port)
905 {
906 enum bt_notification_iterator_status ret =
907 BT_NOTIFICATION_ITERATOR_STATUS_OK;
908 struct lttng_live_stream_iterator_generic *s;
909
910 BT_ASSERT(it);
911
912 s = bt_private_port_get_user_data(port);
913 BT_ASSERT(s);
914 switch (s->type) {
915 case LIVE_STREAM_TYPE_NO_STREAM:
916 {
917 struct lttng_live_no_stream_iterator *no_stream_iter =
918 container_of(s, struct lttng_live_no_stream_iterator, p);
919 ret = bt_self_notification_iterator_set_user_data(it, no_stream_iter);
920 if (ret) {
921 goto error;
922 }
923 break;
924 }
925 case LIVE_STREAM_TYPE_STREAM:
926 {
927 struct lttng_live_stream_iterator *stream_iter =
928 container_of(s, struct lttng_live_stream_iterator, p);
929 ret = bt_self_notification_iterator_set_user_data(it, stream_iter);
930 if (ret) {
931 goto error;
932 }
933 break;
934 }
935 default:
936 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
937 goto end;
938 }
939
940 end:
941 return ret;
942 error:
943 if (bt_self_notification_iterator_set_user_data(it, NULL)
944 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
945 BT_LOGE("Error setting private data to NULL");
946 }
947 goto end;
948 }
949
950 static
951 struct bt_component_class_query_method_return lttng_live_query_list_sessions(
952 struct bt_component_class *comp_class,
953 struct bt_query_executor *query_exec,
954 struct bt_value *params)
955 {
956 struct bt_component_class_query_method_return query_ret = {
957 .result = NULL,
958 .status = BT_QUERY_STATUS_OK,
959 };
960
961 struct bt_value *url_value = NULL;
962 const char *url;
963 struct bt_live_viewer_connection *viewer_connection = NULL;
964
965 url_value = bt_value_map_get(params, "url");
966 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
967 BT_LOGW("Mandatory \"url\" parameter missing");
968 query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
969 goto error;
970 }
971
972 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
973 BT_LOGW("\"url\" parameter is required to be a string value");
974 query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
975 goto error;
976 }
977
978 viewer_connection = bt_live_viewer_connection_create(url, NULL);
979 if (!viewer_connection) {
980 goto error;
981 }
982
983 query_ret.result =
984 bt_live_viewer_connection_list_sessions(viewer_connection);
985 if (!query_ret.result) {
986 goto error;
987 }
988
989 goto end;
990
991 error:
992 BT_OBJECT_PUT_REF_AND_RESET(query_ret.result);
993
994 if (query_ret.status >= 0) {
995 query_ret.status = BT_QUERY_STATUS_ERROR;
996 }
997
998 end:
999 if (viewer_connection) {
1000 bt_live_viewer_connection_destroy(viewer_connection);
1001 }
1002 BT_OBJECT_PUT_REF_AND_RESET(url_value);
1003 return query_ret;
1004 }
1005
1006 BT_HIDDEN
1007 struct bt_component_class_query_method_return lttng_live_query(
1008 struct bt_component_class *comp_class,
1009 struct bt_query_executor *query_exec,
1010 const char *object, struct bt_value *params)
1011 {
1012 struct bt_component_class_query_method_return ret = {
1013 .result = NULL,
1014 .status = BT_QUERY_STATUS_OK,
1015 };
1016
1017 if (strcmp(object, "sessions") == 0) {
1018 return lttng_live_query_list_sessions(comp_class,
1019 query_exec, params);
1020 }
1021 BT_LOGW("Unknown query object `%s`", object);
1022 ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
1023 return ret;
1024 }
1025
1026 static
1027 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1028 {
1029 int ret;
1030 struct lttng_live_session *session, *s;
1031
1032 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1033 lttng_live_destroy_session(session);
1034 }
1035 BT_OBJECT_PUT_REF_AND_RESET(lttng_live->viewer_connection);
1036 if (lttng_live->url) {
1037 g_string_free(lttng_live->url, TRUE);
1038 }
1039 if (lttng_live->no_stream_port) {
1040 bt_object_get_ref(lttng_live->no_stream_port);
1041 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
1042 bt_object_put_ref(lttng_live->no_stream_port);
1043 BT_ASSERT(!ret);
1044 }
1045 if (lttng_live->no_stream_iter) {
1046 g_free(lttng_live->no_stream_iter);
1047 }
1048 g_free(lttng_live);
1049 }
1050
1051 BT_HIDDEN
1052 void lttng_live_component_finalize(struct bt_self_component *component)
1053 {
1054 void *data = bt_self_component_get_user_data(component);
1055
1056 if (!data) {
1057 return;
1058 }
1059 lttng_live_component_destroy_data(data);
1060 }
1061
1062 static
1063 struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
1064 struct bt_self_component *private_component)
1065 {
1066 struct lttng_live_component *lttng_live;
1067 struct bt_value *value = NULL;
1068 const char *url;
1069 enum bt_value_status ret;
1070
1071 lttng_live = g_new0(struct lttng_live_component, 1);
1072 if (!lttng_live) {
1073 goto end;
1074 }
1075 /* TODO: make this an overridable parameter. */
1076 lttng_live->max_query_size = MAX_QUERY_SIZE;
1077 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1078 value = bt_value_map_get(params, "url");
1079 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
1080 BT_LOGW("Mandatory \"url\" parameter missing");
1081 goto error;
1082 }
1083 url = bt_value_string_get(value);
1084 lttng_live->url = g_string_new(url);
1085 if (!lttng_live->url) {
1086 goto error;
1087 }
1088 BT_OBJECT_PUT_REF_AND_RESET(value);
1089 lttng_live->viewer_connection =
1090 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
1091 if (!lttng_live->viewer_connection) {
1092 goto error;
1093 }
1094 if (lttng_live_create_viewer_session(lttng_live)) {
1095 goto error;
1096 }
1097 lttng_live->private_component = private_component;
1098
1099 goto end;
1100
1101 error:
1102 lttng_live_component_destroy_data(lttng_live);
1103 lttng_live = NULL;
1104 end:
1105 return lttng_live;
1106 }
1107
1108 BT_HIDDEN
1109 enum bt_component_status lttng_live_component_init(
1110 struct bt_self_component *private_component,
1111 struct bt_value *params, void *init_method_data)
1112 {
1113 struct lttng_live_component *lttng_live;
1114 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1115
1116 /* Passes ownership of iter ref to lttng_live_component_create. */
1117 lttng_live = lttng_live_component_create(params, private_component);
1118 if (!lttng_live) {
1119 //TODO : we need access to the application cancel state
1120 //because we are not part of a graph yet.
1121 ret = BT_COMPONENT_STATUS_NOMEM;
1122 goto end;
1123 }
1124
1125 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1126 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1127 lttng_live->no_stream_iter->lttng_live = lttng_live;
1128 if (lttng_live_is_canceled(lttng_live)) {
1129 goto end;
1130 }
1131 ret = bt_self_component_source_add_output_port(
1132 lttng_live->private_component, "no-stream",
1133 lttng_live->no_stream_iter,
1134 &lttng_live->no_stream_port);
1135 if (ret != BT_COMPONENT_STATUS_OK) {
1136 goto end;
1137 }
1138 bt_object_put_ref(lttng_live->no_stream_port); /* weak */
1139 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1140
1141 ret = bt_self_component_set_user_data(private_component, lttng_live);
1142 if (ret != BT_COMPONENT_STATUS_OK) {
1143 goto error;
1144 }
1145
1146 end:
1147 return ret;
1148 error:
1149 (void) bt_self_component_set_user_data(private_component, NULL);
1150 lttng_live_component_destroy_data(lttng_live);
1151 return ret;
1152 }
1153
1154 BT_HIDDEN
1155 enum bt_component_status lttng_live_accept_port_connection(
1156 struct bt_self_component *private_component,
1157 struct bt_private_port *self_private_port,
1158 struct bt_port *other_port)
1159 {
1160 struct lttng_live_component *lttng_live =
1161 bt_self_component_get_user_data(private_component);
1162 struct bt_component *other_component;
1163 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1164 struct bt_port *self_port = bt_port_from_private(self_private_port);
1165
1166 other_component = bt_port_get_component(other_port);
1167 bt_object_put_ref(other_component); /* weak */
1168
1169 if (!lttng_live->downstream_component) {
1170 lttng_live->downstream_component = other_component;
1171 goto end;
1172 }
1173
1174 /*
1175 * Compare prior component to ensure we are connected to the
1176 * same downstream component as prior ports.
1177 */
1178 if (lttng_live->downstream_component != other_component) {
1179 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1180 bt_port_get_name(self_port),
1181 bt_component_get_name(other_component),
1182 bt_component_get_name(lttng_live->downstream_component));
1183 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1184 goto end;
1185 }
1186 end:
1187 bt_object_put_ref(self_port);
1188 return status;
1189 }
This page took 0.055234 seconds and 4 git commands to generate.