lttng-live: check graph cancel state, check ctf trace pointer
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab 47#include <babeltrace/compiler-internal.h>
6f79a7cf 48#include <babeltrace/types.h>
7cdc2bab
MD
49#include <inttypes.h>
50#include <glib.h>
51#include <assert.h>
52#include <unistd.h>
7d61fa8e 53#include <plugins-common.h>
f3bc2010 54
7cdc2bab
MD
55#include "data-stream.h"
56#include "metadata.h"
0f5e83e5 57#include "lttng-live-internal.h"
7cdc2bab 58
7cdc2bab 59#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 60
087bc060 61#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
62
63static const char *print_state(struct lttng_live_stream_iterator *s)
64{
65 switch (s->state) {
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT:
71 return "QUIESCENT";
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
73 return "ACTIVE_DATA";
74 case LTTNG_LIVE_STREAM_EOF:
75 return "EOF";
76 default:
77 return "ERROR";
78 }
79}
7cdc2bab 80
6f79a7cf
MD
81static
82void print_stream_state(struct lttng_live_stream_iterator *stream)
83{
84 struct bt_port *port;
85
86 port = bt_port_from_private_port(stream->port);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
88 bt_port_get_name(port),
89 print_state(stream),
90 stream->last_returned_inactivity_timestamp,
91 stream->current_inactivity_timestamp);
92 bt_put(port);
93}
94
95BT_HIDDEN
96bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
97{
98 struct bt_component *component;
99 struct bt_graph *graph;
100 bt_bool ret;
101
102 if (!lttng_live) {
103 return BT_FALSE;
104 }
105
106 component = bt_component_from_private_component(lttng_live->private_component);
107 graph = bt_component_get_graph(component);
108 ret = bt_graph_is_canceled(graph);
109 bt_put(graph);
110 bt_put(component);
111 return ret;
112}
7cdc2bab 113
7cdc2bab
MD
114BT_HIDDEN
115int lttng_live_add_port(struct lttng_live_component *lttng_live,
116 struct lttng_live_stream_iterator *stream_iter)
117{
118 int ret;
119 struct bt_private_port *private_port;
120 char name[STREAM_NAME_MAX_LEN];
6f79a7cf 121 enum bt_component_status status;
7cdc2bab
MD
122
123 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
124 assert(ret > 0);
125 strcpy(stream_iter->name, name);
4bf0e537
MD
126 if (lttng_live_is_canceled(lttng_live)) {
127 return 0;
128 }
6f79a7cf 129 status = bt_private_component_source_add_output_private_port(
147337a3
PP
130 lttng_live->private_component, name, stream_iter,
131 &private_port);
6f79a7cf
MD
132 switch (status) {
133 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
134 return 0;
135 case BT_COMPONENT_STATUS_OK:
136 break;
137 default:
7cdc2bab
MD
138 return -1;
139 }
6f79a7cf 140 bt_put(private_port); /* weak */
087bc060 141 BT_LOGI("Added port %s", name);
7cdc2bab
MD
142
143 if (lttng_live->no_stream_port) {
6f79a7cf 144 bt_get(lttng_live->no_stream_port);
7cdc2bab 145 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 146 bt_put(lttng_live->no_stream_port);
7cdc2bab
MD
147 if (ret) {
148 return -1;
149 }
6f79a7cf 150 lttng_live->no_stream_port = NULL;
7cdc2bab
MD
151 lttng_live->no_stream_iter->port = NULL;
152 }
153 stream_iter->port = private_port;
154 return 0;
155}
156
157BT_HIDDEN
158int lttng_live_remove_port(struct lttng_live_component *lttng_live,
159 struct bt_private_port *port)
160{
161 struct bt_component *component;
162 int64_t nr_ports;
163 int ret;
164
165 component = bt_component_from_private_component(lttng_live->private_component);
166 nr_ports = bt_component_source_get_output_port_count(component);
167 if (nr_ports < 0) {
168 return -1;
169 }
170 BT_PUT(component);
171 if (nr_ports == 1) {
6f79a7cf
MD
172 enum bt_component_status status;
173
7cdc2bab 174 assert(!lttng_live->no_stream_port);
4bf0e537
MD
175
176 if (lttng_live_is_canceled(lttng_live)) {
177 return 0;
178 }
6f79a7cf 179 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
147337a3
PP
180 "no-stream", lttng_live->no_stream_iter,
181 &lttng_live->no_stream_port);
6f79a7cf
MD
182 switch (status) {
183 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
184 return 0;
185 case BT_COMPONENT_STATUS_OK:
186 break;
187 default:
7cdc2bab
MD
188 return -1;
189 }
6f79a7cf 190 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
191 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
192 }
6f79a7cf 193 bt_get(port);
7cdc2bab 194 ret = bt_private_port_remove_from_component(port);
6f79a7cf 195 bt_put(port);
7cdc2bab
MD
196 if (ret) {
197 return -1;
198 }
199 return 0;
200}
201
202static
203struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
204 uint64_t trace_id)
d3e4dcd8 205{
7cdc2bab
MD
206 struct lttng_live_trace *trace;
207
208 bt_list_for_each_entry(trace, &session->traces, node) {
209 if (trace->id == trace_id) {
210 return trace;
211 }
212 }
d3eb6e8f
PP
213 return NULL;
214}
215
7cdc2bab
MD
216static
217void lttng_live_destroy_trace(struct bt_object *obj)
218{
219 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
220
087bc060 221 BT_LOGI("Destroy trace");
7cdc2bab
MD
222 assert(bt_list_empty(&trace->streams));
223 bt_list_del(&trace->node);
5bd230f4 224
4bf0e537
MD
225 if (trace->trace) {
226 int retval;
5bd230f4 227
4bf0e537
MD
228 retval = bt_ctf_trace_set_is_static(trace->trace);
229 assert(!retval);
230 BT_PUT(trace->trace);
231 }
7cdc2bab
MD
232 lttng_live_metadata_fini(trace);
233 BT_PUT(trace->cc_prio_map);
234 g_free(trace);
235}
236
237static
238struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
239 uint64_t trace_id)
240{
241 struct lttng_live_trace *trace = NULL;
242
243 trace = g_new0(struct lttng_live_trace, 1);
244 if (!trace) {
245 goto error;
246 }
247 trace->session = session;
248 trace->id = trace_id;
249 BT_INIT_LIST_HEAD(&trace->streams);
250 trace->new_metadata_needed = true;
251 bt_list_add(&trace->node, &session->traces);
252 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 253 BT_LOGI("Create trace");
7cdc2bab
MD
254 goto end;
255error:
256 g_free(trace);
257 trace = NULL;
258end:
259 return trace;
260}
261
262BT_HIDDEN
263struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
264 uint64_t trace_id)
265{
266 struct lttng_live_trace *trace;
267
268 trace = lttng_live_find_trace(session, trace_id);
269 if (trace) {
270 bt_get(trace);
271 return trace;
272 }
273 return lttng_live_create_trace(session, trace_id);
274}
275
276BT_HIDDEN
277void lttng_live_unref_trace(struct lttng_live_trace *trace)
278{
279 bt_put(trace);
280}
281
282static
283void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
284{
285 struct lttng_live_stream_iterator *stream, *s;
286
287 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
288 lttng_live_stream_iterator_destroy(stream);
289 }
290 lttng_live_metadata_fini(trace);
291}
292
293BT_HIDDEN
06994c71
MD
294int lttng_live_add_session(struct lttng_live_component *lttng_live,
295 uint64_t session_id, const char *hostname,
296 const char *session_name)
7cdc2bab
MD
297{
298 int ret = 0;
299 struct lttng_live_session *s;
300
301 s = g_new0(struct lttng_live_session, 1);
302 if (!s) {
303 goto error;
304 }
305
306 s->id = session_id;
307 BT_INIT_LIST_HEAD(&s->traces);
308 s->lttng_live = lttng_live;
309 s->new_streams_needed = true;
06994c71
MD
310 s->hostname = g_string_new(hostname);
311 s->session_name = g_string_new(session_name);
7cdc2bab 312
06994c71
MD
313 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
314 s->id, hostname, session_name);
7cdc2bab
MD
315 bt_list_add(&s->node, &lttng_live->sessions);
316 goto end;
317error:
087bc060 318 BT_LOGE("Error adding session");
7cdc2bab
MD
319 g_free(s);
320 ret = -1;
321end:
322 return ret;
323}
324
325static
326void lttng_live_destroy_session(struct lttng_live_session *session)
327{
328 struct lttng_live_trace *trace, *t;
329
087bc060 330 BT_LOGI("Destroy session");
7cdc2bab
MD
331 if (session->id != -1ULL) {
332 if (lttng_live_detach_session(session)) {
6f79a7cf 333 if (!lttng_live_is_canceled(session->lttng_live)) {
4c66436f
MD
334 /* Old relayd cannot detach sessions. */
335 BT_LOGD("Unable to detach session %" PRIu64,
336 session->id);
337 }
7cdc2bab
MD
338 }
339 session->id = -1ULL;
340 }
341 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
342 lttng_live_close_trace_streams(trace);
343 }
344 bt_list_del(&session->node);
06994c71
MD
345 if (session->hostname) {
346 g_string_free(session->hostname, TRUE);
347 }
348 if (session->session_name) {
349 g_string_free(session->session_name, TRUE);
350 }
7cdc2bab
MD
351 g_free(session);
352}
353
354BT_HIDDEN
355void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
356{
357 struct lttng_live_stream_iterator_generic *s =
358 bt_private_notification_iterator_get_user_data(it);
359
360 switch (s->type) {
361 case LIVE_STREAM_TYPE_NO_STREAM:
362 {
363 /* Leave no_stream_iter in place when port is removed. */
364 break;
365 }
366 case LIVE_STREAM_TYPE_STREAM:
367 {
368 struct lttng_live_stream_iterator *stream_iter =
369 container_of(s, struct lttng_live_stream_iterator, p);
370
371 lttng_live_stream_iterator_destroy(stream_iter);
372 break;
373 }
374 }
375}
376
377static
378enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
379 struct lttng_live_component *lttng_live,
380 struct lttng_live_stream_iterator *lttng_live_stream)
381{
382 switch (lttng_live_stream->state) {
383 case LTTNG_LIVE_STREAM_QUIESCENT:
384 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
385 break;
386 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
387 /* Invalid state. */
087bc060
MD
388 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
389 abort();
7cdc2bab
MD
390 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
391 /* Invalid state. */
087bc060
MD
392 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
393 abort();
7cdc2bab
MD
394 case LTTNG_LIVE_STREAM_EOF:
395 break;
396 }
397 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
398}
399
400/*
401 * For active no data stream, fetch next data. It can be either:
402 * - quiescent: need to put it in the prio heap at quiescent end
403 * timestamp,
404 * - have data: need to wire up first event into the prio heap,
405 * - have no data on this stream at this point: need to retry (AGAIN) or
406 * return EOF.
407 */
408static
409enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
410 struct lttng_live_component *lttng_live,
411 struct lttng_live_stream_iterator *lttng_live_stream)
412{
413 enum bt_ctf_lttng_live_iterator_status ret =
414 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
415 struct packet_index index;
416 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
417
418 if (lttng_live_stream->trace->new_metadata_needed) {
419 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 goto end;
421 }
422 if (lttng_live_stream->trace->session->new_streams_needed) {
423 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
424 goto end;
425 }
426 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
427 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
428 goto end;
429 }
430 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
431 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
432 goto end;
433 }
434 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
435 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
436 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
437 && lttng_live_stream->last_returned_inactivity_timestamp ==
438 lttng_live_stream->current_inactivity_timestamp) {
439 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 print_stream_state(lttng_live_stream);
441 } else {
442 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
443 }
444 goto end;
445 }
446 lttng_live_stream->base_offset = index.offset;
447 lttng_live_stream->offset = index.offset;
448 lttng_live_stream->len = index.packet_size / CHAR_BIT;
449end:
450 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
451 ret = lttng_live_iterator_next_check_stream_state(
452 lttng_live, lttng_live_stream);
453 }
454 return ret;
455}
456
457/*
458 * Creation of the notification requires the ctf trace to be created
459 * beforehand, but the live protocol gives us all streams (including
460 * metadata) at once. So we split it in three steps: getting streams,
461 * getting metadata (which creates the ctf trace), and then creating the
462 * per-stream notifications.
463 */
464static
465enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
466 struct lttng_live_component *lttng_live,
467 struct lttng_live_session *session)
468{
469 enum bt_ctf_lttng_live_iterator_status status;
470 struct lttng_live_trace *trace, *t;
471
472 if (lttng_live_attach_session(session)) {
6f79a7cf 473 if (lttng_live_is_canceled(lttng_live)) {
4c66436f
MD
474 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
475 } else {
476 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
477 }
7cdc2bab
MD
478 }
479 status = lttng_live_get_new_streams(session);
480 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
481 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
482 return status;
483 }
484 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
485 status = lttng_live_metadata_update(trace);
5bd230f4
MD
486 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
487 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
488 return status;
489 }
490 }
491 return lttng_live_lazy_notif_init(session);
492}
493
494BT_HIDDEN
495void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
496{
497 struct lttng_live_session *session;
498
499 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
500 session->new_streams_needed = true;
501 }
502}
503
504static
505void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
506{
507 struct lttng_live_session *session;
508
509 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
510 struct lttng_live_trace *trace;
511
512 session->new_streams_needed = true;
513 bt_list_for_each_entry(trace, &session->traces, node) {
514 trace->new_metadata_needed = true;
515 }
516 }
517}
518
519static
3cdf4234 520enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
521 struct lttng_live_component *lttng_live)
522{
523 enum bt_ctf_lttng_live_iterator_status ret =
524 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
525 unsigned int nr_sessions_opened = 0;
526 struct lttng_live_session *session, *s;
527
528 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
529 if (session->closed && bt_list_empty(&session->traces)) {
530 lttng_live_destroy_session(session);
531 }
532 }
533 /*
534 * Currently, when there are no sessions, we quit immediately.
535 * We may want to add a component parameter to keep trying until
536 * we get data in the future.
537 * Also, in a remotely distant future, we could add a "new
538 * session" flag to the protocol, which would tell us that we
539 * need to query for new sessions even though we have sessions
540 * currently ongoing.
541 */
542 if (bt_list_empty(&lttng_live->sessions)) {
543 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
544 goto end;
545 }
546 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
547 ret = lttng_live_get_session(lttng_live, session);
548 switch (ret) {
549 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
550 break;
551 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
552 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
553 break;
554 default:
555 goto end;
556 }
557 if (!session->closed) {
558 nr_sessions_opened++;
559 }
560 }
561end:
562 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
563 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
564 }
565 return ret;
566}
567
568static
569enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
570 struct lttng_live_component *lttng_live,
571 struct lttng_live_stream_iterator *lttng_live_stream,
572 struct bt_notification **notification,
573 uint64_t timestamp)
574{
575 enum bt_ctf_lttng_live_iterator_status ret =
576 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
577 struct lttng_live_trace *trace;
578 struct bt_ctf_clock_class *clock_class = NULL;
579 struct bt_ctf_clock_value *clock_value = NULL;
580 struct bt_notification *notif = NULL;
581 int retval;
582
583 trace = lttng_live_stream->trace;
584 if (!trace) {
585 goto error;
586 }
587 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
588 if (!clock_class) {
589 goto error;
590 }
591 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
592 if (!clock_value) {
593 goto error;
594 }
595 notif = bt_notification_inactivity_create(trace->cc_prio_map);
596 if (!notif) {
597 goto error;
598 }
599 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
600 if (retval) {
601 goto error;
602 }
603 *notification = notif;
604end:
605 bt_put(clock_value);
606 bt_put(clock_class);
607 return ret;
608
609error:
610 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
611 bt_put(notif);
612 goto end;
613}
614
615static
616enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
617 struct lttng_live_component *lttng_live,
618 struct lttng_live_stream_iterator *lttng_live_stream,
619 struct bt_notification **notification)
620{
621 enum bt_ctf_lttng_live_iterator_status ret =
622 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
623 struct bt_ctf_clock_class *clock_class = NULL;
624 struct bt_ctf_clock_value *clock_value = NULL;
625
626 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
627 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
628 }
629
630 if (lttng_live_stream->current_inactivity_timestamp ==
631 lttng_live_stream->last_returned_inactivity_timestamp) {
632 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
634 goto end;
635 }
636
637 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
638 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
639
640 lttng_live_stream->last_returned_inactivity_timestamp =
641 lttng_live_stream->current_inactivity_timestamp;
642end:
643 bt_put(clock_value);
644 bt_put(clock_class);
645 return ret;
646}
647
648static
649enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
650 struct lttng_live_component *lttng_live,
651 struct lttng_live_stream_iterator *lttng_live_stream,
652 struct bt_notification **notification)
653{
654 enum bt_ctf_lttng_live_iterator_status ret =
655 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
656 enum bt_ctf_notif_iter_status status;
657 struct lttng_live_session *session;
658
659 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
660 struct lttng_live_trace *trace;
661
662 if (session->new_streams_needed) {
663 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
664 }
665 bt_list_for_each_entry(trace, &session->traces, node) {
666 if (trace->new_metadata_needed) {
667 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
668 }
669 }
670 }
671
672 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
673 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
674 }
675 if (lttng_live_stream->packet_end_notif_queue) {
676 *notification = lttng_live_stream->packet_end_notif_queue;
677 lttng_live_stream->packet_end_notif_queue = NULL;
678 status = BT_CTF_NOTIF_ITER_STATUS_OK;
679 } else {
680 status = bt_ctf_notif_iter_get_next_notification(
681 lttng_live_stream->notif_iter,
682 lttng_live_stream->trace->cc_prio_map,
683 notification);
684 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
685 /*
686 * Consider empty packets as inactivity.
687 */
688 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
689 lttng_live_stream->packet_end_notif_queue = *notification;
690 *notification = NULL;
691 return emit_inactivity_notification(lttng_live,
692 lttng_live_stream, notification,
693 lttng_live_stream->current_packet_end_timestamp);
694 }
695 }
696 }
697 switch (status) {
698 case BT_CTF_NOTIF_ITER_STATUS_EOF:
699 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
700 break;
701 case BT_CTF_NOTIF_ITER_STATUS_OK:
702 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
703 break;
704 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
705 /*
706 * Continue immediately (end of packet). The next
707 * get_index may return AGAIN to delay the following
708 * attempt.
709 */
710 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
711 break;
712 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
713 /* No argument provided by the user, so don't return INVAL. */
714 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
715 default:
716 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
717 break;
718 }
719 return ret;
720}
721
722/*
723 * helper function:
724 * handle_no_data_streams()
725 * retry:
726 * - for each ACTIVE_NO_DATA stream:
727 * - query relayd for stream data, or quiescence info.
728 * - if need metadata, get metadata, goto retry.
729 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
730 * - if quiescent, move to QUIESCENT streams
731 * - if fetched data, move to ACTIVE_DATA streams
732 * (at this point each stream either has data, or is quiescent)
733 *
734 *
735 * iterator_next:
736 * handle_new_streams_and_metadata()
737 * - query relayd for known streams, add them as ACTIVE_NO_DATA
738 * - query relayd for metadata
739 *
740 * call handle_active_no_data_streams()
741 *
742 * handle_quiescent_streams()
743 * - if at least one stream is ACTIVE_DATA:
744 * - peek stream event with lowest timestamp -> next_ts
745 * - for each quiescent stream
746 * - if next_ts >= quiescent end
747 * - set state to ACTIVE_NO_DATA
748 * - else
749 * - for each quiescent stream
750 * - set state to ACTIVE_NO_DATA
751 *
752 * call handle_active_no_data_streams()
753 *
754 * handle_active_data_streams()
755 * - if at least one stream is ACTIVE_DATA:
756 * - get stream event with lowest timestamp from heap
757 * - make that stream event the current notification.
758 * - move this stream heap position to its next event
759 * - if we need to fetch data from relayd, move
760 * stream to ACTIVE_NO_DATA.
761 * - return OK
762 * - return AGAIN
763 *
764 * end criterion: ctrl-c on client. If relayd exits or the session
765 * closes on the relay daemon side, we keep on waiting for streams.
766 * Eventually handle --end timestamp (also an end criterion).
767 *
768 * When disconnected from relayd: try to re-connect endlessly.
769 */
770static
771struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
772 struct bt_private_notification_iterator *iterator,
773 struct lttng_live_stream_iterator *stream_iter)
774{
775 enum bt_ctf_lttng_live_iterator_status status;
776 struct bt_notification_iterator_next_return next_return;
777 struct lttng_live_component *lttng_live;
778
779 lttng_live = stream_iter->trace->session->lttng_live;
780retry:
781 print_stream_state(stream_iter);
782 next_return.notification = NULL;
783 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
784 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
785 goto end;
786 }
787 status = lttng_live_iterator_next_handle_one_no_data_stream(
788 lttng_live, stream_iter);
789 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
790 goto end;
791 }
792 status = lttng_live_iterator_next_handle_one_quiescent_stream(
793 lttng_live, stream_iter, &next_return.notification);
794 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
795 assert(next_return.notification == NULL);
796 goto end;
797 }
798 if (next_return.notification) {
799 goto end;
800 }
801 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
802 stream_iter, &next_return.notification);
803 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
804 assert(next_return.notification == NULL);
805 }
806
807end:
808 switch (status) {
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
810 print_dbg("continue");
811 goto retry;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
814 print_dbg("again");
815 break;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
817 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
818 print_dbg("end");
819 break;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
821 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
822 print_dbg("ok");
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
825 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
826 break;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
828 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
829 break;
830 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
831 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
832 break;
833 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
834 default: /* fall-through */
835 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
836 break;
837 }
838 return next_return;
839}
840
841static
842struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
843 struct bt_private_notification_iterator *iterator,
844 struct lttng_live_no_stream_iterator *no_stream_iter)
845{
846 enum bt_ctf_lttng_live_iterator_status status;
847 struct bt_notification_iterator_next_return next_return;
848 struct lttng_live_component *lttng_live;
849
850 lttng_live = no_stream_iter->lttng_live;
851retry:
852 lttng_live_force_new_streams_and_metadata(lttng_live);
853 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
854 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
855 goto end;
856 }
857 if (no_stream_iter->port) {
858 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
859 } else {
860 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
861 }
862end:
863 switch (status) {
864 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
865 goto retry;
866 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
867 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
868 break;
869 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
870 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
871 break;
872 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
873 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
874 break;
875 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
876 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
877 break;
878 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
879 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
880 break;
881 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
882 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
883 break;
884 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
885 default: /* fall-through */
886 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
887 break;
888 }
889 return next_return;
890}
891
d3eb6e8f 892BT_HIDDEN
41a2b7ae 893struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 894 struct bt_private_notification_iterator *iterator)
d3eb6e8f 895{
7cdc2bab
MD
896 struct lttng_live_stream_iterator_generic *s =
897 bt_private_notification_iterator_get_user_data(iterator);
898 struct bt_notification_iterator_next_return next_return;
899
900 switch (s->type) {
901 case LIVE_STREAM_TYPE_NO_STREAM:
902 next_return = lttng_live_iterator_next_no_stream(iterator,
903 container_of(s, struct lttng_live_no_stream_iterator, p));
904 break;
905 case LIVE_STREAM_TYPE_STREAM:
906 next_return = lttng_live_iterator_next_stream(iterator,
907 container_of(s, struct lttng_live_stream_iterator, p));
908 break;
909 default:
910 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
911 break;
912 }
913 return next_return;
914}
41a2b7ae 915
7cdc2bab
MD
916BT_HIDDEN
917enum bt_notification_iterator_status lttng_live_iterator_init(
918 struct bt_private_notification_iterator *it,
919 struct bt_private_port *port)
920{
921 enum bt_notification_iterator_status ret =
922 BT_NOTIFICATION_ITERATOR_STATUS_OK;
923 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
924
925 assert(it);
926
927 s = bt_private_port_get_user_data(port);
928 assert(s);
929 switch (s->type) {
930 case LIVE_STREAM_TYPE_NO_STREAM:
931 {
932 struct lttng_live_no_stream_iterator *no_stream_iter =
933 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
934 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
935 if (ret) {
936 goto error;
937 }
938 break;
939 }
940 case LIVE_STREAM_TYPE_STREAM:
941 {
942 struct lttng_live_stream_iterator *stream_iter =
943 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
944 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
945 if (ret) {
946 goto error;
947 }
948 break;
949 }
950 default:
951 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
952 goto end;
953 }
954
955end:
41a2b7ae 956 return ret;
7cdc2bab
MD
957error:
958 if (bt_private_notification_iterator_set_user_data(it, NULL)
959 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 960 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
961 }
962 goto end;
963}
964
965static
966struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
967 struct bt_value *params)
968{
969 struct bt_value *url_value = NULL;
970 struct bt_value *results = NULL;
971 const char *url;
972 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
973
974 url_value = bt_value_map_get(params, "url");
975 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 976 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
977 goto error;
978 }
979
3cdf4234 980 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 981 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
982 goto error;
983 }
984
4c66436f 985 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 986 if (!viewer_connection) {
7cdc2bab
MD
987 goto error;
988 }
989
990 results = bt_live_viewer_connection_list_sessions(viewer_connection);
991 goto end;
992error:
993 BT_PUT(results);
994end:
995 if (viewer_connection) {
996 bt_live_viewer_connection_destroy(viewer_connection);
997 }
998 BT_PUT(url_value);
999 return results;
1000}
1001
1002BT_HIDDEN
1003struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
1004 const char *object, struct bt_value *params)
1005{
1006 if (strcmp(object, "sessions") == 0) {
1007 return lttng_live_query_list_sessions(comp_class,
1008 params);
1009 }
087bc060 1010 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
1011 return NULL;
1012}
1013
1014static
1015void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1016{
1017 int ret;
1018 struct lttng_live_session *session, *s;
1019
1020 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1021 lttng_live_destroy_session(session);
1022 }
1023 BT_PUT(lttng_live->viewer_connection);
1024 if (lttng_live->url) {
1025 g_string_free(lttng_live->url, TRUE);
1026 }
1027 if (lttng_live->no_stream_port) {
6f79a7cf 1028 bt_get(lttng_live->no_stream_port);
7cdc2bab 1029 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 1030 bt_put(lttng_live->no_stream_port);
7cdc2bab 1031 assert(!ret);
7cdc2bab
MD
1032 }
1033 if (lttng_live->no_stream_iter) {
1034 g_free(lttng_live->no_stream_iter);
1035 }
1036 g_free(lttng_live);
1037}
1038
1039BT_HIDDEN
1040void lttng_live_component_finalize(struct bt_private_component *component)
1041{
1042 void *data = bt_private_component_get_user_data(component);
1043
1044 if (!data) {
1045 return;
1046 }
1047 lttng_live_component_destroy_data(data);
1048}
1049
1050static
1051struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
6f79a7cf 1052 struct bt_private_component *private_component)
7cdc2bab
MD
1053{
1054 struct lttng_live_component *lttng_live;
1055 struct bt_value *value = NULL;
1056 const char *url;
1057 enum bt_value_status ret;
1058
1059 lttng_live = g_new0(struct lttng_live_component, 1);
1060 if (!lttng_live) {
1061 goto end;
1062 }
7cdc2bab
MD
1063 /* TODO: make this an overridable parameter. */
1064 lttng_live->max_query_size = MAX_QUERY_SIZE;
1065 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1066 value = bt_value_map_get(params, "url");
1067 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1068 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1069 goto error;
1070 }
1071 ret = bt_value_string_get(value, &url);
1072 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1073 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1074 goto error;
1075 }
1076 lttng_live->url = g_string_new(url);
1077 if (!lttng_live->url) {
1078 goto error;
1079 }
6f79a7cf 1080 BT_PUT(value);
7cdc2bab 1081 lttng_live->viewer_connection =
4c66436f 1082 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1083 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1084 goto error;
1085 }
1086 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1087 goto error;
1088 }
1089 lttng_live->private_component = private_component;
4c66436f 1090
7cdc2bab
MD
1091 goto end;
1092
1093error:
1094 lttng_live_component_destroy_data(lttng_live);
1095 lttng_live = NULL;
1096end:
1097 return lttng_live;
d3e4dcd8
PP
1098}
1099
f3bc2010 1100BT_HIDDEN
3cdf4234
MD
1101enum bt_component_status lttng_live_component_init(
1102 struct bt_private_component *private_component,
7cdc2bab 1103 struct bt_value *params, void *init_method_data)
f3bc2010 1104{
7cdc2bab
MD
1105 struct lttng_live_component *lttng_live;
1106 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1107
7cdc2bab 1108 /* Passes ownership of iter ref to lttng_live_component_create. */
6f79a7cf 1109 lttng_live = lttng_live_component_create(params, private_component);
7cdc2bab 1110 if (!lttng_live) {
6f79a7cf
MD
1111 //TODO : we need access to the application cancel state
1112 //because we are not part of a graph yet.
1113 ret = BT_COMPONENT_STATUS_NOMEM;
7cdc2bab
MD
1114 goto end;
1115 }
1116
1117 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1118 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1119 lttng_live->no_stream_iter->lttng_live = lttng_live;
4bf0e537
MD
1120 if (lttng_live_is_canceled(lttng_live)) {
1121 goto end;
1122 }
147337a3 1123 ret = bt_private_component_source_add_output_private_port(
7cdc2bab 1124 lttng_live->private_component, "no-stream",
147337a3
PP
1125 lttng_live->no_stream_iter,
1126 &lttng_live->no_stream_port);
1127 if (ret != BT_COMPONENT_STATUS_OK) {
1128 goto end;
1129 }
6f79a7cf 1130 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
1131 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1132
3cdf4234 1133 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1134 if (ret != BT_COMPONENT_STATUS_OK) {
1135 goto error;
1136 }
1137
1138end:
1139 return ret;
1140error:
3cdf4234 1141 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1142 lttng_live_component_destroy_data(lttng_live);
1143 return ret;
f3bc2010 1144}
087bc060 1145
d85ef162
MD
1146BT_HIDDEN
1147enum bt_component_status lttng_live_accept_port_connection(
1148 struct bt_private_component *private_component,
1149 struct bt_private_port *self_private_port,
1150 struct bt_port *other_port)
1151{
1152 struct lttng_live_component *lttng_live =
1153 bt_private_component_get_user_data(private_component);
1154 struct bt_component *other_component;
1155 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1156 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1157
1158 other_component = bt_port_get_component(other_port);
1159 bt_put(other_component); /* weak */
1160
1161 if (!lttng_live->downstream_component) {
1162 lttng_live->downstream_component = other_component;
1163 goto end;
1164 }
1165
1166 /*
1167 * Compare prior component to ensure we are connected to the
1168 * same downstream component as prior ports.
1169 */
1170 if (lttng_live->downstream_component != other_component) {
1171 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1172 bt_port_get_name(self_port),
1173 bt_component_get_name(other_component),
1174 bt_component_get_name(lttng_live->downstream_component));
1175 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1176 goto end;
1177 }
1178end:
1179 bt_put(self_port);
1180 return status;
1181}
This page took 0.086426 seconds and 4 git commands to generate.