Fix: cancel a notif. iter. finalized during its "next" method
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab
MD
47#include <babeltrace/compiler-internal.h>
48#include <inttypes.h>
49#include <glib.h>
50#include <assert.h>
51#include <unistd.h>
7d61fa8e 52#include <plugins-common.h>
f3bc2010 53
7cdc2bab
MD
54#include "data-stream.h"
55#include "metadata.h"
0f5e83e5 56#include "lttng-live-internal.h"
7cdc2bab 57
7cdc2bab 58#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 59
087bc060 60#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
61
62static const char *print_state(struct lttng_live_stream_iterator *s)
63{
64 switch (s->state) {
65 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
66 return "ACTIVE_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
68 return "QUIESCENT_NO_DATA";
69 case LTTNG_LIVE_STREAM_QUIESCENT:
70 return "QUIESCENT";
71 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
72 return "ACTIVE_DATA";
73 case LTTNG_LIVE_STREAM_EOF:
74 return "EOF";
75 default:
76 return "ERROR";
77 }
78}
7cdc2bab
MD
79
80#define print_stream_state(stream) \
81 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
82 bt_port_get_name(bt_port_from_private_port(stream->port)), \
83 print_state(stream), stream->last_returned_inactivity_timestamp, \
84 stream->current_inactivity_timestamp)
85
7cdc2bab
MD
86BT_HIDDEN
87int lttng_live_add_port(struct lttng_live_component *lttng_live,
88 struct lttng_live_stream_iterator *stream_iter)
89{
90 int ret;
91 struct bt_private_port *private_port;
92 char name[STREAM_NAME_MAX_LEN];
93
94 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
95 assert(ret > 0);
96 strcpy(stream_iter->name, name);
97 private_port = bt_private_component_source_add_output_private_port(
98 lttng_live->private_component, name, stream_iter);
99 if (!private_port) {
100 return -1;
101 }
087bc060 102 BT_LOGI("Added port %s", name);
7cdc2bab
MD
103
104 if (lttng_live->no_stream_port) {
105 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
106 if (ret) {
107 return -1;
108 }
109 BT_PUT(lttng_live->no_stream_port);
110 lttng_live->no_stream_iter->port = NULL;
111 }
112 stream_iter->port = private_port;
113 return 0;
114}
115
116BT_HIDDEN
117int lttng_live_remove_port(struct lttng_live_component *lttng_live,
118 struct bt_private_port *port)
119{
120 struct bt_component *component;
121 int64_t nr_ports;
122 int ret;
123
124 component = bt_component_from_private_component(lttng_live->private_component);
125 nr_ports = bt_component_source_get_output_port_count(component);
126 if (nr_ports < 0) {
127 return -1;
128 }
129 BT_PUT(component);
130 if (nr_ports == 1) {
131 assert(!lttng_live->no_stream_port);
132 lttng_live->no_stream_port =
133 bt_private_component_source_add_output_private_port(lttng_live->private_component,
134 "no-stream", lttng_live->no_stream_iter);
135 if (!lttng_live->no_stream_port) {
136 return -1;
137 }
138 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
139 }
140 ret = bt_private_port_remove_from_component(port);
141 if (ret) {
142 return -1;
143 }
144 return 0;
145}
146
147static
148struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
149 uint64_t trace_id)
d3e4dcd8 150{
7cdc2bab
MD
151 struct lttng_live_trace *trace;
152
153 bt_list_for_each_entry(trace, &session->traces, node) {
154 if (trace->id == trace_id) {
155 return trace;
156 }
157 }
d3eb6e8f
PP
158 return NULL;
159}
160
7cdc2bab
MD
161static
162void lttng_live_destroy_trace(struct bt_object *obj)
163{
164 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 165 int retval;
7cdc2bab 166
087bc060 167 BT_LOGI("Destroy trace");
7cdc2bab
MD
168 assert(bt_list_empty(&trace->streams));
169 bt_list_del(&trace->node);
5bd230f4
MD
170
171 retval = bt_ctf_trace_set_is_static(trace->trace);
172 assert(!retval);
173
7cdc2bab
MD
174 lttng_live_metadata_fini(trace);
175 BT_PUT(trace->cc_prio_map);
176 g_free(trace);
177}
178
179static
180struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
181 uint64_t trace_id)
182{
183 struct lttng_live_trace *trace = NULL;
184
185 trace = g_new0(struct lttng_live_trace, 1);
186 if (!trace) {
187 goto error;
188 }
189 trace->session = session;
190 trace->id = trace_id;
191 BT_INIT_LIST_HEAD(&trace->streams);
192 trace->new_metadata_needed = true;
193 bt_list_add(&trace->node, &session->traces);
194 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 195 BT_LOGI("Create trace");
7cdc2bab
MD
196 goto end;
197error:
198 g_free(trace);
199 trace = NULL;
200end:
201 return trace;
202}
203
204BT_HIDDEN
205struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
206 uint64_t trace_id)
207{
208 struct lttng_live_trace *trace;
209
210 trace = lttng_live_find_trace(session, trace_id);
211 if (trace) {
212 bt_get(trace);
213 return trace;
214 }
215 return lttng_live_create_trace(session, trace_id);
216}
217
218BT_HIDDEN
219void lttng_live_unref_trace(struct lttng_live_trace *trace)
220{
221 bt_put(trace);
222}
223
224static
225void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
226{
227 struct lttng_live_stream_iterator *stream, *s;
228
229 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
230 lttng_live_stream_iterator_destroy(stream);
231 }
232 lttng_live_metadata_fini(trace);
233}
234
235BT_HIDDEN
236int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
237{
238 int ret = 0;
239 struct lttng_live_session *s;
240
241 s = g_new0(struct lttng_live_session, 1);
242 if (!s) {
243 goto error;
244 }
245
246 s->id = session_id;
247 BT_INIT_LIST_HEAD(&s->traces);
248 s->lttng_live = lttng_live;
249 s->new_streams_needed = true;
250
087bc060 251 BT_LOGI("Reading from session %" PRIu64, s->id);
7cdc2bab
MD
252 bt_list_add(&s->node, &lttng_live->sessions);
253 goto end;
254error:
087bc060 255 BT_LOGE("Error adding session");
7cdc2bab
MD
256 g_free(s);
257 ret = -1;
258end:
259 return ret;
260}
261
262static
263void lttng_live_destroy_session(struct lttng_live_session *session)
264{
265 struct lttng_live_trace *trace, *t;
266
087bc060 267 BT_LOGI("Destroy session");
7cdc2bab
MD
268 if (session->id != -1ULL) {
269 if (lttng_live_detach_session(session)) {
4c66436f
MD
270 if (!bt_graph_is_canceled(session->lttng_live->graph)) {
271 /* Old relayd cannot detach sessions. */
272 BT_LOGD("Unable to detach session %" PRIu64,
273 session->id);
274 }
7cdc2bab
MD
275 }
276 session->id = -1ULL;
277 }
278 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
279 lttng_live_close_trace_streams(trace);
280 }
281 bt_list_del(&session->node);
282 g_free(session);
283}
284
285BT_HIDDEN
286void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
287{
288 struct lttng_live_stream_iterator_generic *s =
289 bt_private_notification_iterator_get_user_data(it);
290
291 switch (s->type) {
292 case LIVE_STREAM_TYPE_NO_STREAM:
293 {
294 /* Leave no_stream_iter in place when port is removed. */
295 break;
296 }
297 case LIVE_STREAM_TYPE_STREAM:
298 {
299 struct lttng_live_stream_iterator *stream_iter =
300 container_of(s, struct lttng_live_stream_iterator, p);
301
302 lttng_live_stream_iterator_destroy(stream_iter);
303 break;
304 }
305 }
306}
307
308static
309enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
310 struct lttng_live_component *lttng_live,
311 struct lttng_live_stream_iterator *lttng_live_stream)
312{
313 switch (lttng_live_stream->state) {
314 case LTTNG_LIVE_STREAM_QUIESCENT:
315 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
316 break;
317 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
318 /* Invalid state. */
087bc060
MD
319 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
320 abort();
7cdc2bab
MD
321 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
322 /* Invalid state. */
087bc060
MD
323 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
324 abort();
7cdc2bab
MD
325 case LTTNG_LIVE_STREAM_EOF:
326 break;
327 }
328 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
329}
330
331/*
332 * For active no data stream, fetch next data. It can be either:
333 * - quiescent: need to put it in the prio heap at quiescent end
334 * timestamp,
335 * - have data: need to wire up first event into the prio heap,
336 * - have no data on this stream at this point: need to retry (AGAIN) or
337 * return EOF.
338 */
339static
340enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
341 struct lttng_live_component *lttng_live,
342 struct lttng_live_stream_iterator *lttng_live_stream)
343{
344 enum bt_ctf_lttng_live_iterator_status ret =
345 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
346 struct packet_index index;
347 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
348
349 if (lttng_live_stream->trace->new_metadata_needed) {
350 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
351 goto end;
352 }
353 if (lttng_live_stream->trace->session->new_streams_needed) {
354 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
355 goto end;
356 }
357 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
358 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
359 goto end;
360 }
361 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
362 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
363 goto end;
364 }
365 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
366 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
367 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
368 && lttng_live_stream->last_returned_inactivity_timestamp ==
369 lttng_live_stream->current_inactivity_timestamp) {
370 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
371 print_stream_state(lttng_live_stream);
372 } else {
373 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
374 }
375 goto end;
376 }
377 lttng_live_stream->base_offset = index.offset;
378 lttng_live_stream->offset = index.offset;
379 lttng_live_stream->len = index.packet_size / CHAR_BIT;
380end:
381 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
382 ret = lttng_live_iterator_next_check_stream_state(
383 lttng_live, lttng_live_stream);
384 }
385 return ret;
386}
387
388/*
389 * Creation of the notification requires the ctf trace to be created
390 * beforehand, but the live protocol gives us all streams (including
391 * metadata) at once. So we split it in three steps: getting streams,
392 * getting metadata (which creates the ctf trace), and then creating the
393 * per-stream notifications.
394 */
395static
396enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
397 struct lttng_live_component *lttng_live,
398 struct lttng_live_session *session)
399{
400 enum bt_ctf_lttng_live_iterator_status status;
401 struct lttng_live_trace *trace, *t;
402
403 if (lttng_live_attach_session(session)) {
4c66436f
MD
404 if (bt_graph_is_canceled(lttng_live->graph)) {
405 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
406 } else {
407 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
408 }
7cdc2bab
MD
409 }
410 status = lttng_live_get_new_streams(session);
411 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
412 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
413 return status;
414 }
415 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
416 status = lttng_live_metadata_update(trace);
5bd230f4
MD
417 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
418 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
419 return status;
420 }
421 }
422 return lttng_live_lazy_notif_init(session);
423}
424
425BT_HIDDEN
426void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
427{
428 struct lttng_live_session *session;
429
430 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
431 session->new_streams_needed = true;
432 }
433}
434
435static
436void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
437{
438 struct lttng_live_session *session;
439
440 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
441 struct lttng_live_trace *trace;
442
443 session->new_streams_needed = true;
444 bt_list_for_each_entry(trace, &session->traces, node) {
445 trace->new_metadata_needed = true;
446 }
447 }
448}
449
450static
3cdf4234 451enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
452 struct lttng_live_component *lttng_live)
453{
454 enum bt_ctf_lttng_live_iterator_status ret =
455 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
456 unsigned int nr_sessions_opened = 0;
457 struct lttng_live_session *session, *s;
458
459 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
460 if (session->closed && bt_list_empty(&session->traces)) {
461 lttng_live_destroy_session(session);
462 }
463 }
464 /*
465 * Currently, when there are no sessions, we quit immediately.
466 * We may want to add a component parameter to keep trying until
467 * we get data in the future.
468 * Also, in a remotely distant future, we could add a "new
469 * session" flag to the protocol, which would tell us that we
470 * need to query for new sessions even though we have sessions
471 * currently ongoing.
472 */
473 if (bt_list_empty(&lttng_live->sessions)) {
474 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
475 goto end;
476 }
477 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
478 ret = lttng_live_get_session(lttng_live, session);
479 switch (ret) {
480 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
481 break;
482 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
483 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
484 break;
485 default:
486 goto end;
487 }
488 if (!session->closed) {
489 nr_sessions_opened++;
490 }
491 }
492end:
493 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
494 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
495 }
496 return ret;
497}
498
499static
500enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
501 struct lttng_live_component *lttng_live,
502 struct lttng_live_stream_iterator *lttng_live_stream,
503 struct bt_notification **notification,
504 uint64_t timestamp)
505{
506 enum bt_ctf_lttng_live_iterator_status ret =
507 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
508 struct lttng_live_trace *trace;
509 struct bt_ctf_clock_class *clock_class = NULL;
510 struct bt_ctf_clock_value *clock_value = NULL;
511 struct bt_notification *notif = NULL;
512 int retval;
513
514 trace = lttng_live_stream->trace;
515 if (!trace) {
516 goto error;
517 }
518 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
519 if (!clock_class) {
520 goto error;
521 }
522 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
523 if (!clock_value) {
524 goto error;
525 }
526 notif = bt_notification_inactivity_create(trace->cc_prio_map);
527 if (!notif) {
528 goto error;
529 }
530 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
531 if (retval) {
532 goto error;
533 }
534 *notification = notif;
535end:
536 bt_put(clock_value);
537 bt_put(clock_class);
538 return ret;
539
540error:
541 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
542 bt_put(notif);
543 goto end;
544}
545
546static
547enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
548 struct lttng_live_component *lttng_live,
549 struct lttng_live_stream_iterator *lttng_live_stream,
550 struct bt_notification **notification)
551{
552 enum bt_ctf_lttng_live_iterator_status ret =
553 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
554 struct bt_ctf_clock_class *clock_class = NULL;
555 struct bt_ctf_clock_value *clock_value = NULL;
556
557 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
558 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
559 }
560
561 if (lttng_live_stream->current_inactivity_timestamp ==
562 lttng_live_stream->last_returned_inactivity_timestamp) {
563 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
564 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
565 goto end;
566 }
567
568 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
569 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
570
571 lttng_live_stream->last_returned_inactivity_timestamp =
572 lttng_live_stream->current_inactivity_timestamp;
573end:
574 bt_put(clock_value);
575 bt_put(clock_class);
576 return ret;
577}
578
579static
580enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
581 struct lttng_live_component *lttng_live,
582 struct lttng_live_stream_iterator *lttng_live_stream,
583 struct bt_notification **notification)
584{
585 enum bt_ctf_lttng_live_iterator_status ret =
586 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
587 enum bt_ctf_notif_iter_status status;
588 struct lttng_live_session *session;
589
590 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
591 struct lttng_live_trace *trace;
592
593 if (session->new_streams_needed) {
594 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
595 }
596 bt_list_for_each_entry(trace, &session->traces, node) {
597 if (trace->new_metadata_needed) {
598 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
599 }
600 }
601 }
602
603 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
604 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
605 }
606 if (lttng_live_stream->packet_end_notif_queue) {
607 *notification = lttng_live_stream->packet_end_notif_queue;
608 lttng_live_stream->packet_end_notif_queue = NULL;
609 status = BT_CTF_NOTIF_ITER_STATUS_OK;
610 } else {
611 status = bt_ctf_notif_iter_get_next_notification(
612 lttng_live_stream->notif_iter,
613 lttng_live_stream->trace->cc_prio_map,
614 notification);
615 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
616 /*
617 * Consider empty packets as inactivity.
618 */
619 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
620 lttng_live_stream->packet_end_notif_queue = *notification;
621 *notification = NULL;
622 return emit_inactivity_notification(lttng_live,
623 lttng_live_stream, notification,
624 lttng_live_stream->current_packet_end_timestamp);
625 }
626 }
627 }
628 switch (status) {
629 case BT_CTF_NOTIF_ITER_STATUS_EOF:
630 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
631 break;
632 case BT_CTF_NOTIF_ITER_STATUS_OK:
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
634 break;
635 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
636 /*
637 * Continue immediately (end of packet). The next
638 * get_index may return AGAIN to delay the following
639 * attempt.
640 */
641 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
642 break;
643 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
644 /* No argument provided by the user, so don't return INVAL. */
645 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
646 default:
647 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
648 break;
649 }
650 return ret;
651}
652
653/*
654 * helper function:
655 * handle_no_data_streams()
656 * retry:
657 * - for each ACTIVE_NO_DATA stream:
658 * - query relayd for stream data, or quiescence info.
659 * - if need metadata, get metadata, goto retry.
660 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
661 * - if quiescent, move to QUIESCENT streams
662 * - if fetched data, move to ACTIVE_DATA streams
663 * (at this point each stream either has data, or is quiescent)
664 *
665 *
666 * iterator_next:
667 * handle_new_streams_and_metadata()
668 * - query relayd for known streams, add them as ACTIVE_NO_DATA
669 * - query relayd for metadata
670 *
671 * call handle_active_no_data_streams()
672 *
673 * handle_quiescent_streams()
674 * - if at least one stream is ACTIVE_DATA:
675 * - peek stream event with lowest timestamp -> next_ts
676 * - for each quiescent stream
677 * - if next_ts >= quiescent end
678 * - set state to ACTIVE_NO_DATA
679 * - else
680 * - for each quiescent stream
681 * - set state to ACTIVE_NO_DATA
682 *
683 * call handle_active_no_data_streams()
684 *
685 * handle_active_data_streams()
686 * - if at least one stream is ACTIVE_DATA:
687 * - get stream event with lowest timestamp from heap
688 * - make that stream event the current notification.
689 * - move this stream heap position to its next event
690 * - if we need to fetch data from relayd, move
691 * stream to ACTIVE_NO_DATA.
692 * - return OK
693 * - return AGAIN
694 *
695 * end criterion: ctrl-c on client. If relayd exits or the session
696 * closes on the relay daemon side, we keep on waiting for streams.
697 * Eventually handle --end timestamp (also an end criterion).
698 *
699 * When disconnected from relayd: try to re-connect endlessly.
700 */
701static
702struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
703 struct bt_private_notification_iterator *iterator,
704 struct lttng_live_stream_iterator *stream_iter)
705{
706 enum bt_ctf_lttng_live_iterator_status status;
707 struct bt_notification_iterator_next_return next_return;
708 struct lttng_live_component *lttng_live;
709
710 lttng_live = stream_iter->trace->session->lttng_live;
711retry:
712 print_stream_state(stream_iter);
713 next_return.notification = NULL;
714 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
715 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
716 goto end;
717 }
718 status = lttng_live_iterator_next_handle_one_no_data_stream(
719 lttng_live, stream_iter);
720 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
721 goto end;
722 }
723 status = lttng_live_iterator_next_handle_one_quiescent_stream(
724 lttng_live, stream_iter, &next_return.notification);
725 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
726 assert(next_return.notification == NULL);
727 goto end;
728 }
729 if (next_return.notification) {
730 goto end;
731 }
732 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
733 stream_iter, &next_return.notification);
734 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
735 assert(next_return.notification == NULL);
736 }
737
738end:
739 switch (status) {
740 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
741 print_dbg("continue");
742 goto retry;
743 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
744 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
745 print_dbg("again");
746 break;
747 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
748 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
749 print_dbg("end");
750 break;
751 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
752 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
753 print_dbg("ok");
754 break;
755 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
756 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
757 break;
758 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
759 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
760 break;
761 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
762 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
763 break;
764 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
765 default: /* fall-through */
766 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
767 break;
768 }
769 return next_return;
770}
771
772static
773struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
774 struct bt_private_notification_iterator *iterator,
775 struct lttng_live_no_stream_iterator *no_stream_iter)
776{
777 enum bt_ctf_lttng_live_iterator_status status;
778 struct bt_notification_iterator_next_return next_return;
779 struct lttng_live_component *lttng_live;
780
781 lttng_live = no_stream_iter->lttng_live;
782retry:
783 lttng_live_force_new_streams_and_metadata(lttng_live);
784 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
785 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
786 goto end;
787 }
788 if (no_stream_iter->port) {
789 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
790 } else {
791 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
792 }
793end:
794 switch (status) {
795 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
796 goto retry;
797 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
798 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
799 break;
800 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
801 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
802 break;
803 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
805 break;
806 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
807 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
808 break;
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
810 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
811 break;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
814 break;
815 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
816 default: /* fall-through */
817 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
818 break;
819 }
820 return next_return;
821}
822
d3eb6e8f 823BT_HIDDEN
41a2b7ae 824struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 825 struct bt_private_notification_iterator *iterator)
d3eb6e8f 826{
7cdc2bab
MD
827 struct lttng_live_stream_iterator_generic *s =
828 bt_private_notification_iterator_get_user_data(iterator);
829 struct bt_notification_iterator_next_return next_return;
830
831 switch (s->type) {
832 case LIVE_STREAM_TYPE_NO_STREAM:
833 next_return = lttng_live_iterator_next_no_stream(iterator,
834 container_of(s, struct lttng_live_no_stream_iterator, p));
835 break;
836 case LIVE_STREAM_TYPE_STREAM:
837 next_return = lttng_live_iterator_next_stream(iterator,
838 container_of(s, struct lttng_live_stream_iterator, p));
839 break;
840 default:
841 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
842 break;
843 }
844 return next_return;
845}
41a2b7ae 846
7cdc2bab
MD
847BT_HIDDEN
848enum bt_notification_iterator_status lttng_live_iterator_init(
849 struct bt_private_notification_iterator *it,
850 struct bt_private_port *port)
851{
852 enum bt_notification_iterator_status ret =
853 BT_NOTIFICATION_ITERATOR_STATUS_OK;
854 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
855
856 assert(it);
857
858 s = bt_private_port_get_user_data(port);
859 assert(s);
860 switch (s->type) {
861 case LIVE_STREAM_TYPE_NO_STREAM:
862 {
863 struct lttng_live_no_stream_iterator *no_stream_iter =
864 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
865 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
866 if (ret) {
867 goto error;
868 }
869 break;
870 }
871 case LIVE_STREAM_TYPE_STREAM:
872 {
873 struct lttng_live_stream_iterator *stream_iter =
874 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
875 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
876 if (ret) {
877 goto error;
878 }
879 break;
880 }
881 default:
882 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
883 goto end;
884 }
885
886end:
41a2b7ae 887 return ret;
7cdc2bab
MD
888error:
889 if (bt_private_notification_iterator_set_user_data(it, NULL)
890 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 891 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
892 }
893 goto end;
894}
895
896static
897struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
898 struct bt_value *params)
899{
900 struct bt_value *url_value = NULL;
901 struct bt_value *results = NULL;
902 const char *url;
903 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
904
905 url_value = bt_value_map_get(params, "url");
906 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 907 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
908 goto error;
909 }
910
3cdf4234 911 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 912 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
913 goto error;
914 }
915
4c66436f 916 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 917 if (!viewer_connection) {
7cdc2bab
MD
918 goto error;
919 }
920
921 results = bt_live_viewer_connection_list_sessions(viewer_connection);
922 goto end;
923error:
924 BT_PUT(results);
925end:
926 if (viewer_connection) {
927 bt_live_viewer_connection_destroy(viewer_connection);
928 }
929 BT_PUT(url_value);
930 return results;
931}
932
933BT_HIDDEN
934struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
935 const char *object, struct bt_value *params)
936{
937 if (strcmp(object, "sessions") == 0) {
938 return lttng_live_query_list_sessions(comp_class,
939 params);
940 }
087bc060 941 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
942 return NULL;
943}
944
945static
946void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
947{
948 int ret;
949 struct lttng_live_session *session, *s;
950
951 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
952 lttng_live_destroy_session(session);
953 }
954 BT_PUT(lttng_live->viewer_connection);
955 if (lttng_live->url) {
956 g_string_free(lttng_live->url, TRUE);
957 }
958 if (lttng_live->no_stream_port) {
959 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
960 assert(!ret);
961 BT_PUT(lttng_live->no_stream_port);
962 }
963 if (lttng_live->no_stream_iter) {
964 g_free(lttng_live->no_stream_iter);
965 }
966 g_free(lttng_live);
967}
968
969BT_HIDDEN
970void lttng_live_component_finalize(struct bt_private_component *component)
971{
972 void *data = bt_private_component_get_user_data(component);
973
974 if (!data) {
975 return;
976 }
977 lttng_live_component_destroy_data(data);
978}
979
980static
981struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
3cdf4234
MD
982 struct bt_private_component *private_component,
983 struct bt_graph *graph)
7cdc2bab
MD
984{
985 struct lttng_live_component *lttng_live;
986 struct bt_value *value = NULL;
987 const char *url;
988 enum bt_value_status ret;
989
990 lttng_live = g_new0(struct lttng_live_component, 1);
991 if (!lttng_live) {
992 goto end;
993 }
7cdc2bab
MD
994 /* TODO: make this an overridable parameter. */
995 lttng_live->max_query_size = MAX_QUERY_SIZE;
996 BT_INIT_LIST_HEAD(&lttng_live->sessions);
997 value = bt_value_map_get(params, "url");
998 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 999 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1000 goto error;
1001 }
1002 ret = bt_value_string_get(value, &url);
1003 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1004 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1005 goto error;
1006 }
1007 lttng_live->url = g_string_new(url);
1008 if (!lttng_live->url) {
1009 goto error;
1010 }
1011 lttng_live->viewer_connection =
4c66436f 1012 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1013 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1014 goto error;
1015 }
1016 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1017 goto error;
1018 }
1019 lttng_live->private_component = private_component;
3cdf4234 1020 lttng_live->graph = graph;
4c66436f 1021
7cdc2bab
MD
1022 goto end;
1023
1024error:
1025 lttng_live_component_destroy_data(lttng_live);
1026 lttng_live = NULL;
1027end:
1028 return lttng_live;
d3e4dcd8
PP
1029}
1030
f3bc2010 1031BT_HIDDEN
3cdf4234
MD
1032enum bt_component_status lttng_live_component_init(
1033 struct bt_private_component *private_component,
7cdc2bab 1034 struct bt_value *params, void *init_method_data)
f3bc2010 1035{
7cdc2bab
MD
1036 struct lttng_live_component *lttng_live;
1037 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
3cdf4234
MD
1038 struct bt_component *component;
1039 struct bt_graph *graph;
1040
1041 component = bt_component_from_private_component(private_component);
1042 graph = bt_component_get_graph(component);
1043 bt_put(graph); /* weak */
1044 bt_put(component);
7cdc2bab 1045
7cdc2bab 1046 /* Passes ownership of iter ref to lttng_live_component_create. */
3cdf4234
MD
1047 lttng_live = lttng_live_component_create(params, private_component,
1048 graph);
7cdc2bab 1049 if (!lttng_live) {
3cdf4234
MD
1050 if (bt_graph_is_canceled(graph)) {
1051 ret = BT_COMPONENT_STATUS_AGAIN;
1052 } else {
1053 ret = BT_COMPONENT_STATUS_NOMEM;
1054 }
7cdc2bab
MD
1055 goto end;
1056 }
1057
1058 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1059 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1060 lttng_live->no_stream_iter->lttng_live = lttng_live;
1061
1062 lttng_live->no_stream_port =
1063 bt_private_component_source_add_output_private_port(
1064 lttng_live->private_component, "no-stream",
1065 lttng_live->no_stream_iter);
1066 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1067
3cdf4234 1068 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1069 if (ret != BT_COMPONENT_STATUS_OK) {
1070 goto error;
1071 }
1072
1073end:
1074 return ret;
1075error:
3cdf4234 1076 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1077 lttng_live_component_destroy_data(lttng_live);
1078 return ret;
f3bc2010 1079}
087bc060 1080
d85ef162
MD
1081BT_HIDDEN
1082enum bt_component_status lttng_live_accept_port_connection(
1083 struct bt_private_component *private_component,
1084 struct bt_private_port *self_private_port,
1085 struct bt_port *other_port)
1086{
1087 struct lttng_live_component *lttng_live =
1088 bt_private_component_get_user_data(private_component);
1089 struct bt_component *other_component;
1090 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1091 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1092
1093 other_component = bt_port_get_component(other_port);
1094 bt_put(other_component); /* weak */
1095
1096 if (!lttng_live->downstream_component) {
1097 lttng_live->downstream_component = other_component;
1098 goto end;
1099 }
1100
1101 /*
1102 * Compare prior component to ensure we are connected to the
1103 * same downstream component as prior ports.
1104 */
1105 if (lttng_live->downstream_component != other_component) {
1106 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1107 bt_port_get_name(self_port),
1108 bt_component_get_name(other_component),
1109 bt_component_get_name(lttng_live->downstream_component));
1110 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1111 goto end;
1112 }
1113end:
1114 bt_put(self_port);
1115 return status;
1116}
This page took 0.074028 seconds and 4 git commands to generate.