Fix build with BUILT_IN_PLUGINS=1
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab 47#include <babeltrace/compiler-internal.h>
6f79a7cf 48#include <babeltrace/types.h>
7cdc2bab
MD
49#include <inttypes.h>
50#include <glib.h>
51#include <assert.h>
52#include <unistd.h>
7d61fa8e 53#include <plugins-common.h>
f3bc2010 54
7cdc2bab
MD
55#include "data-stream.h"
56#include "metadata.h"
0f5e83e5 57#include "lttng-live-internal.h"
7cdc2bab 58
7cdc2bab 59#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 60
087bc060 61#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
62
63static const char *print_state(struct lttng_live_stream_iterator *s)
64{
65 switch (s->state) {
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT:
71 return "QUIESCENT";
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
73 return "ACTIVE_DATA";
74 case LTTNG_LIVE_STREAM_EOF:
75 return "EOF";
76 default:
77 return "ERROR";
78 }
79}
7cdc2bab 80
6f79a7cf
MD
81static
82void print_stream_state(struct lttng_live_stream_iterator *stream)
83{
84 struct bt_port *port;
85
86 port = bt_port_from_private_port(stream->port);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
88 bt_port_get_name(port),
89 print_state(stream),
90 stream->last_returned_inactivity_timestamp,
91 stream->current_inactivity_timestamp);
92 bt_put(port);
93}
94
95BT_HIDDEN
96bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
97{
98 struct bt_component *component;
99 struct bt_graph *graph;
100 bt_bool ret;
101
102 if (!lttng_live) {
103 return BT_FALSE;
104 }
105
106 component = bt_component_from_private_component(lttng_live->private_component);
107 graph = bt_component_get_graph(component);
108 ret = bt_graph_is_canceled(graph);
109 bt_put(graph);
110 bt_put(component);
111 return ret;
112}
7cdc2bab 113
7cdc2bab
MD
114BT_HIDDEN
115int lttng_live_add_port(struct lttng_live_component *lttng_live,
116 struct lttng_live_stream_iterator *stream_iter)
117{
118 int ret;
119 struct bt_private_port *private_port;
120 char name[STREAM_NAME_MAX_LEN];
6f79a7cf 121 enum bt_component_status status;
7cdc2bab
MD
122
123 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
124 assert(ret > 0);
125 strcpy(stream_iter->name, name);
6f79a7cf 126 status = bt_private_component_source_add_output_private_port(
147337a3
PP
127 lttng_live->private_component, name, stream_iter,
128 &private_port);
6f79a7cf
MD
129 switch (status) {
130 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
131 return 0;
132 case BT_COMPONENT_STATUS_OK:
133 break;
134 default:
7cdc2bab
MD
135 return -1;
136 }
6f79a7cf 137 bt_put(private_port); /* weak */
087bc060 138 BT_LOGI("Added port %s", name);
7cdc2bab
MD
139
140 if (lttng_live->no_stream_port) {
6f79a7cf 141 bt_get(lttng_live->no_stream_port);
7cdc2bab 142 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 143 bt_put(lttng_live->no_stream_port);
7cdc2bab
MD
144 if (ret) {
145 return -1;
146 }
6f79a7cf 147 lttng_live->no_stream_port = NULL;
7cdc2bab
MD
148 lttng_live->no_stream_iter->port = NULL;
149 }
150 stream_iter->port = private_port;
151 return 0;
152}
153
154BT_HIDDEN
155int lttng_live_remove_port(struct lttng_live_component *lttng_live,
156 struct bt_private_port *port)
157{
158 struct bt_component *component;
159 int64_t nr_ports;
160 int ret;
161
162 component = bt_component_from_private_component(lttng_live->private_component);
163 nr_ports = bt_component_source_get_output_port_count(component);
164 if (nr_ports < 0) {
165 return -1;
166 }
167 BT_PUT(component);
168 if (nr_ports == 1) {
6f79a7cf
MD
169 enum bt_component_status status;
170
7cdc2bab 171 assert(!lttng_live->no_stream_port);
6f79a7cf 172 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
147337a3
PP
173 "no-stream", lttng_live->no_stream_iter,
174 &lttng_live->no_stream_port);
6f79a7cf
MD
175 switch (status) {
176 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
177 return 0;
178 case BT_COMPONENT_STATUS_OK:
179 break;
180 default:
7cdc2bab
MD
181 return -1;
182 }
6f79a7cf 183 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
184 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
185 }
6f79a7cf 186 bt_get(port);
7cdc2bab 187 ret = bt_private_port_remove_from_component(port);
6f79a7cf 188 bt_put(port);
7cdc2bab
MD
189 if (ret) {
190 return -1;
191 }
192 return 0;
193}
194
195static
196struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
197 uint64_t trace_id)
d3e4dcd8 198{
7cdc2bab
MD
199 struct lttng_live_trace *trace;
200
201 bt_list_for_each_entry(trace, &session->traces, node) {
202 if (trace->id == trace_id) {
203 return trace;
204 }
205 }
d3eb6e8f
PP
206 return NULL;
207}
208
7cdc2bab
MD
209static
210void lttng_live_destroy_trace(struct bt_object *obj)
211{
212 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 213 int retval;
7cdc2bab 214
087bc060 215 BT_LOGI("Destroy trace");
7cdc2bab
MD
216 assert(bt_list_empty(&trace->streams));
217 bt_list_del(&trace->node);
5bd230f4
MD
218
219 retval = bt_ctf_trace_set_is_static(trace->trace);
220 assert(!retval);
221
6f79a7cf 222 BT_PUT(trace->trace);
7cdc2bab
MD
223 lttng_live_metadata_fini(trace);
224 BT_PUT(trace->cc_prio_map);
225 g_free(trace);
226}
227
228static
229struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
230 uint64_t trace_id)
231{
232 struct lttng_live_trace *trace = NULL;
233
234 trace = g_new0(struct lttng_live_trace, 1);
235 if (!trace) {
236 goto error;
237 }
238 trace->session = session;
239 trace->id = trace_id;
240 BT_INIT_LIST_HEAD(&trace->streams);
241 trace->new_metadata_needed = true;
242 bt_list_add(&trace->node, &session->traces);
243 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 244 BT_LOGI("Create trace");
7cdc2bab
MD
245 goto end;
246error:
247 g_free(trace);
248 trace = NULL;
249end:
250 return trace;
251}
252
253BT_HIDDEN
254struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
255 uint64_t trace_id)
256{
257 struct lttng_live_trace *trace;
258
259 trace = lttng_live_find_trace(session, trace_id);
260 if (trace) {
261 bt_get(trace);
262 return trace;
263 }
264 return lttng_live_create_trace(session, trace_id);
265}
266
267BT_HIDDEN
268void lttng_live_unref_trace(struct lttng_live_trace *trace)
269{
270 bt_put(trace);
271}
272
273static
274void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
275{
276 struct lttng_live_stream_iterator *stream, *s;
277
278 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
279 lttng_live_stream_iterator_destroy(stream);
280 }
281 lttng_live_metadata_fini(trace);
282}
283
284BT_HIDDEN
06994c71
MD
285int lttng_live_add_session(struct lttng_live_component *lttng_live,
286 uint64_t session_id, const char *hostname,
287 const char *session_name)
7cdc2bab
MD
288{
289 int ret = 0;
290 struct lttng_live_session *s;
291
292 s = g_new0(struct lttng_live_session, 1);
293 if (!s) {
294 goto error;
295 }
296
297 s->id = session_id;
298 BT_INIT_LIST_HEAD(&s->traces);
299 s->lttng_live = lttng_live;
300 s->new_streams_needed = true;
06994c71
MD
301 s->hostname = g_string_new(hostname);
302 s->session_name = g_string_new(session_name);
7cdc2bab 303
06994c71
MD
304 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
305 s->id, hostname, session_name);
7cdc2bab
MD
306 bt_list_add(&s->node, &lttng_live->sessions);
307 goto end;
308error:
087bc060 309 BT_LOGE("Error adding session");
7cdc2bab
MD
310 g_free(s);
311 ret = -1;
312end:
313 return ret;
314}
315
316static
317void lttng_live_destroy_session(struct lttng_live_session *session)
318{
319 struct lttng_live_trace *trace, *t;
320
087bc060 321 BT_LOGI("Destroy session");
7cdc2bab
MD
322 if (session->id != -1ULL) {
323 if (lttng_live_detach_session(session)) {
6f79a7cf 324 if (!lttng_live_is_canceled(session->lttng_live)) {
4c66436f
MD
325 /* Old relayd cannot detach sessions. */
326 BT_LOGD("Unable to detach session %" PRIu64,
327 session->id);
328 }
7cdc2bab
MD
329 }
330 session->id = -1ULL;
331 }
332 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
333 lttng_live_close_trace_streams(trace);
334 }
335 bt_list_del(&session->node);
06994c71
MD
336 if (session->hostname) {
337 g_string_free(session->hostname, TRUE);
338 }
339 if (session->session_name) {
340 g_string_free(session->session_name, TRUE);
341 }
7cdc2bab
MD
342 g_free(session);
343}
344
345BT_HIDDEN
346void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
347{
348 struct lttng_live_stream_iterator_generic *s =
349 bt_private_notification_iterator_get_user_data(it);
350
351 switch (s->type) {
352 case LIVE_STREAM_TYPE_NO_STREAM:
353 {
354 /* Leave no_stream_iter in place when port is removed. */
355 break;
356 }
357 case LIVE_STREAM_TYPE_STREAM:
358 {
359 struct lttng_live_stream_iterator *stream_iter =
360 container_of(s, struct lttng_live_stream_iterator, p);
361
362 lttng_live_stream_iterator_destroy(stream_iter);
363 break;
364 }
365 }
366}
367
368static
369enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
370 struct lttng_live_component *lttng_live,
371 struct lttng_live_stream_iterator *lttng_live_stream)
372{
373 switch (lttng_live_stream->state) {
374 case LTTNG_LIVE_STREAM_QUIESCENT:
375 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
376 break;
377 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
378 /* Invalid state. */
087bc060
MD
379 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
380 abort();
7cdc2bab
MD
381 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
382 /* Invalid state. */
087bc060
MD
383 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
384 abort();
7cdc2bab
MD
385 case LTTNG_LIVE_STREAM_EOF:
386 break;
387 }
388 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
389}
390
391/*
392 * For active no data stream, fetch next data. It can be either:
393 * - quiescent: need to put it in the prio heap at quiescent end
394 * timestamp,
395 * - have data: need to wire up first event into the prio heap,
396 * - have no data on this stream at this point: need to retry (AGAIN) or
397 * return EOF.
398 */
399static
400enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
401 struct lttng_live_component *lttng_live,
402 struct lttng_live_stream_iterator *lttng_live_stream)
403{
404 enum bt_ctf_lttng_live_iterator_status ret =
405 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
406 struct packet_index index;
407 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
408
409 if (lttng_live_stream->trace->new_metadata_needed) {
410 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
411 goto end;
412 }
413 if (lttng_live_stream->trace->session->new_streams_needed) {
414 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
415 goto end;
416 }
417 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
418 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
419 goto end;
420 }
421 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
422 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
423 goto end;
424 }
425 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
426 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
427 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
428 && lttng_live_stream->last_returned_inactivity_timestamp ==
429 lttng_live_stream->current_inactivity_timestamp) {
430 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
431 print_stream_state(lttng_live_stream);
432 } else {
433 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
434 }
435 goto end;
436 }
437 lttng_live_stream->base_offset = index.offset;
438 lttng_live_stream->offset = index.offset;
439 lttng_live_stream->len = index.packet_size / CHAR_BIT;
440end:
441 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
442 ret = lttng_live_iterator_next_check_stream_state(
443 lttng_live, lttng_live_stream);
444 }
445 return ret;
446}
447
448/*
449 * Creation of the notification requires the ctf trace to be created
450 * beforehand, but the live protocol gives us all streams (including
451 * metadata) at once. So we split it in three steps: getting streams,
452 * getting metadata (which creates the ctf trace), and then creating the
453 * per-stream notifications.
454 */
455static
456enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
457 struct lttng_live_component *lttng_live,
458 struct lttng_live_session *session)
459{
460 enum bt_ctf_lttng_live_iterator_status status;
461 struct lttng_live_trace *trace, *t;
462
463 if (lttng_live_attach_session(session)) {
6f79a7cf 464 if (lttng_live_is_canceled(lttng_live)) {
4c66436f
MD
465 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
466 } else {
467 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
468 }
7cdc2bab
MD
469 }
470 status = lttng_live_get_new_streams(session);
471 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
472 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
473 return status;
474 }
475 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
476 status = lttng_live_metadata_update(trace);
5bd230f4
MD
477 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
478 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
479 return status;
480 }
481 }
482 return lttng_live_lazy_notif_init(session);
483}
484
485BT_HIDDEN
486void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
487{
488 struct lttng_live_session *session;
489
490 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
491 session->new_streams_needed = true;
492 }
493}
494
495static
496void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
497{
498 struct lttng_live_session *session;
499
500 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
501 struct lttng_live_trace *trace;
502
503 session->new_streams_needed = true;
504 bt_list_for_each_entry(trace, &session->traces, node) {
505 trace->new_metadata_needed = true;
506 }
507 }
508}
509
510static
3cdf4234 511enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
512 struct lttng_live_component *lttng_live)
513{
514 enum bt_ctf_lttng_live_iterator_status ret =
515 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
516 unsigned int nr_sessions_opened = 0;
517 struct lttng_live_session *session, *s;
518
519 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
520 if (session->closed && bt_list_empty(&session->traces)) {
521 lttng_live_destroy_session(session);
522 }
523 }
524 /*
525 * Currently, when there are no sessions, we quit immediately.
526 * We may want to add a component parameter to keep trying until
527 * we get data in the future.
528 * Also, in a remotely distant future, we could add a "new
529 * session" flag to the protocol, which would tell us that we
530 * need to query for new sessions even though we have sessions
531 * currently ongoing.
532 */
533 if (bt_list_empty(&lttng_live->sessions)) {
534 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
535 goto end;
536 }
537 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
538 ret = lttng_live_get_session(lttng_live, session);
539 switch (ret) {
540 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
541 break;
542 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
543 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
544 break;
545 default:
546 goto end;
547 }
548 if (!session->closed) {
549 nr_sessions_opened++;
550 }
551 }
552end:
553 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
554 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
555 }
556 return ret;
557}
558
559static
560enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
561 struct lttng_live_component *lttng_live,
562 struct lttng_live_stream_iterator *lttng_live_stream,
563 struct bt_notification **notification,
564 uint64_t timestamp)
565{
566 enum bt_ctf_lttng_live_iterator_status ret =
567 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
568 struct lttng_live_trace *trace;
569 struct bt_ctf_clock_class *clock_class = NULL;
570 struct bt_ctf_clock_value *clock_value = NULL;
571 struct bt_notification *notif = NULL;
572 int retval;
573
574 trace = lttng_live_stream->trace;
575 if (!trace) {
576 goto error;
577 }
578 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
579 if (!clock_class) {
580 goto error;
581 }
582 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
583 if (!clock_value) {
584 goto error;
585 }
586 notif = bt_notification_inactivity_create(trace->cc_prio_map);
587 if (!notif) {
588 goto error;
589 }
590 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
591 if (retval) {
592 goto error;
593 }
594 *notification = notif;
595end:
596 bt_put(clock_value);
597 bt_put(clock_class);
598 return ret;
599
600error:
601 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
602 bt_put(notif);
603 goto end;
604}
605
606static
607enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
608 struct lttng_live_component *lttng_live,
609 struct lttng_live_stream_iterator *lttng_live_stream,
610 struct bt_notification **notification)
611{
612 enum bt_ctf_lttng_live_iterator_status ret =
613 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
614 struct bt_ctf_clock_class *clock_class = NULL;
615 struct bt_ctf_clock_value *clock_value = NULL;
616
617 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
618 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
619 }
620
621 if (lttng_live_stream->current_inactivity_timestamp ==
622 lttng_live_stream->last_returned_inactivity_timestamp) {
623 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
624 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
625 goto end;
626 }
627
628 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
629 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
630
631 lttng_live_stream->last_returned_inactivity_timestamp =
632 lttng_live_stream->current_inactivity_timestamp;
633end:
634 bt_put(clock_value);
635 bt_put(clock_class);
636 return ret;
637}
638
639static
640enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
641 struct lttng_live_component *lttng_live,
642 struct lttng_live_stream_iterator *lttng_live_stream,
643 struct bt_notification **notification)
644{
645 enum bt_ctf_lttng_live_iterator_status ret =
646 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
647 enum bt_ctf_notif_iter_status status;
648 struct lttng_live_session *session;
649
650 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
651 struct lttng_live_trace *trace;
652
653 if (session->new_streams_needed) {
654 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
655 }
656 bt_list_for_each_entry(trace, &session->traces, node) {
657 if (trace->new_metadata_needed) {
658 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
659 }
660 }
661 }
662
663 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
664 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
665 }
666 if (lttng_live_stream->packet_end_notif_queue) {
667 *notification = lttng_live_stream->packet_end_notif_queue;
668 lttng_live_stream->packet_end_notif_queue = NULL;
669 status = BT_CTF_NOTIF_ITER_STATUS_OK;
670 } else {
671 status = bt_ctf_notif_iter_get_next_notification(
672 lttng_live_stream->notif_iter,
673 lttng_live_stream->trace->cc_prio_map,
674 notification);
675 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
676 /*
677 * Consider empty packets as inactivity.
678 */
679 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
680 lttng_live_stream->packet_end_notif_queue = *notification;
681 *notification = NULL;
682 return emit_inactivity_notification(lttng_live,
683 lttng_live_stream, notification,
684 lttng_live_stream->current_packet_end_timestamp);
685 }
686 }
687 }
688 switch (status) {
689 case BT_CTF_NOTIF_ITER_STATUS_EOF:
690 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
691 break;
692 case BT_CTF_NOTIF_ITER_STATUS_OK:
693 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
694 break;
695 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
696 /*
697 * Continue immediately (end of packet). The next
698 * get_index may return AGAIN to delay the following
699 * attempt.
700 */
701 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
702 break;
703 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
704 /* No argument provided by the user, so don't return INVAL. */
705 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
706 default:
707 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
708 break;
709 }
710 return ret;
711}
712
713/*
714 * helper function:
715 * handle_no_data_streams()
716 * retry:
717 * - for each ACTIVE_NO_DATA stream:
718 * - query relayd for stream data, or quiescence info.
719 * - if need metadata, get metadata, goto retry.
720 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
721 * - if quiescent, move to QUIESCENT streams
722 * - if fetched data, move to ACTIVE_DATA streams
723 * (at this point each stream either has data, or is quiescent)
724 *
725 *
726 * iterator_next:
727 * handle_new_streams_and_metadata()
728 * - query relayd for known streams, add them as ACTIVE_NO_DATA
729 * - query relayd for metadata
730 *
731 * call handle_active_no_data_streams()
732 *
733 * handle_quiescent_streams()
734 * - if at least one stream is ACTIVE_DATA:
735 * - peek stream event with lowest timestamp -> next_ts
736 * - for each quiescent stream
737 * - if next_ts >= quiescent end
738 * - set state to ACTIVE_NO_DATA
739 * - else
740 * - for each quiescent stream
741 * - set state to ACTIVE_NO_DATA
742 *
743 * call handle_active_no_data_streams()
744 *
745 * handle_active_data_streams()
746 * - if at least one stream is ACTIVE_DATA:
747 * - get stream event with lowest timestamp from heap
748 * - make that stream event the current notification.
749 * - move this stream heap position to its next event
750 * - if we need to fetch data from relayd, move
751 * stream to ACTIVE_NO_DATA.
752 * - return OK
753 * - return AGAIN
754 *
755 * end criterion: ctrl-c on client. If relayd exits or the session
756 * closes on the relay daemon side, we keep on waiting for streams.
757 * Eventually handle --end timestamp (also an end criterion).
758 *
759 * When disconnected from relayd: try to re-connect endlessly.
760 */
761static
762struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
763 struct bt_private_notification_iterator *iterator,
764 struct lttng_live_stream_iterator *stream_iter)
765{
766 enum bt_ctf_lttng_live_iterator_status status;
767 struct bt_notification_iterator_next_return next_return;
768 struct lttng_live_component *lttng_live;
769
770 lttng_live = stream_iter->trace->session->lttng_live;
771retry:
772 print_stream_state(stream_iter);
773 next_return.notification = NULL;
774 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
775 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
776 goto end;
777 }
778 status = lttng_live_iterator_next_handle_one_no_data_stream(
779 lttng_live, stream_iter);
780 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
781 goto end;
782 }
783 status = lttng_live_iterator_next_handle_one_quiescent_stream(
784 lttng_live, stream_iter, &next_return.notification);
785 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
786 assert(next_return.notification == NULL);
787 goto end;
788 }
789 if (next_return.notification) {
790 goto end;
791 }
792 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
793 stream_iter, &next_return.notification);
794 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
795 assert(next_return.notification == NULL);
796 }
797
798end:
799 switch (status) {
800 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
801 print_dbg("continue");
802 goto retry;
803 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
805 print_dbg("again");
806 break;
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
808 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
809 print_dbg("end");
810 break;
811 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
812 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
813 print_dbg("ok");
814 break;
815 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
816 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
817 break;
818 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
819 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
820 break;
821 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
822 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
825 default: /* fall-through */
826 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
827 break;
828 }
829 return next_return;
830}
831
832static
833struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
834 struct bt_private_notification_iterator *iterator,
835 struct lttng_live_no_stream_iterator *no_stream_iter)
836{
837 enum bt_ctf_lttng_live_iterator_status status;
838 struct bt_notification_iterator_next_return next_return;
839 struct lttng_live_component *lttng_live;
840
841 lttng_live = no_stream_iter->lttng_live;
842retry:
843 lttng_live_force_new_streams_and_metadata(lttng_live);
844 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
845 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
846 goto end;
847 }
848 if (no_stream_iter->port) {
849 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
850 } else {
851 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
852 }
853end:
854 switch (status) {
855 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
856 goto retry;
857 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
858 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
859 break;
860 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
861 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
862 break;
863 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
864 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
865 break;
866 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
867 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
868 break;
869 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
870 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
871 break;
872 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
873 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
874 break;
875 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
876 default: /* fall-through */
877 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
878 break;
879 }
880 return next_return;
881}
882
d3eb6e8f 883BT_HIDDEN
41a2b7ae 884struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 885 struct bt_private_notification_iterator *iterator)
d3eb6e8f 886{
7cdc2bab
MD
887 struct lttng_live_stream_iterator_generic *s =
888 bt_private_notification_iterator_get_user_data(iterator);
889 struct bt_notification_iterator_next_return next_return;
890
891 switch (s->type) {
892 case LIVE_STREAM_TYPE_NO_STREAM:
893 next_return = lttng_live_iterator_next_no_stream(iterator,
894 container_of(s, struct lttng_live_no_stream_iterator, p));
895 break;
896 case LIVE_STREAM_TYPE_STREAM:
897 next_return = lttng_live_iterator_next_stream(iterator,
898 container_of(s, struct lttng_live_stream_iterator, p));
899 break;
900 default:
901 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
902 break;
903 }
904 return next_return;
905}
41a2b7ae 906
7cdc2bab
MD
907BT_HIDDEN
908enum bt_notification_iterator_status lttng_live_iterator_init(
909 struct bt_private_notification_iterator *it,
910 struct bt_private_port *port)
911{
912 enum bt_notification_iterator_status ret =
913 BT_NOTIFICATION_ITERATOR_STATUS_OK;
914 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
915
916 assert(it);
917
918 s = bt_private_port_get_user_data(port);
919 assert(s);
920 switch (s->type) {
921 case LIVE_STREAM_TYPE_NO_STREAM:
922 {
923 struct lttng_live_no_stream_iterator *no_stream_iter =
924 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
925 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
926 if (ret) {
927 goto error;
928 }
929 break;
930 }
931 case LIVE_STREAM_TYPE_STREAM:
932 {
933 struct lttng_live_stream_iterator *stream_iter =
934 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
935 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
936 if (ret) {
937 goto error;
938 }
939 break;
940 }
941 default:
942 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
943 goto end;
944 }
945
946end:
41a2b7ae 947 return ret;
7cdc2bab
MD
948error:
949 if (bt_private_notification_iterator_set_user_data(it, NULL)
950 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 951 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
952 }
953 goto end;
954}
955
956static
957struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
958 struct bt_value *params)
959{
960 struct bt_value *url_value = NULL;
961 struct bt_value *results = NULL;
962 const char *url;
963 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
964
965 url_value = bt_value_map_get(params, "url");
966 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 967 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
968 goto error;
969 }
970
3cdf4234 971 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 972 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
973 goto error;
974 }
975
4c66436f 976 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 977 if (!viewer_connection) {
7cdc2bab
MD
978 goto error;
979 }
980
981 results = bt_live_viewer_connection_list_sessions(viewer_connection);
982 goto end;
983error:
984 BT_PUT(results);
985end:
986 if (viewer_connection) {
987 bt_live_viewer_connection_destroy(viewer_connection);
988 }
989 BT_PUT(url_value);
990 return results;
991}
992
993BT_HIDDEN
994struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
995 const char *object, struct bt_value *params)
996{
997 if (strcmp(object, "sessions") == 0) {
998 return lttng_live_query_list_sessions(comp_class,
999 params);
1000 }
087bc060 1001 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
1002 return NULL;
1003}
1004
1005static
1006void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1007{
1008 int ret;
1009 struct lttng_live_session *session, *s;
1010
1011 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1012 lttng_live_destroy_session(session);
1013 }
1014 BT_PUT(lttng_live->viewer_connection);
1015 if (lttng_live->url) {
1016 g_string_free(lttng_live->url, TRUE);
1017 }
1018 if (lttng_live->no_stream_port) {
6f79a7cf 1019 bt_get(lttng_live->no_stream_port);
7cdc2bab 1020 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 1021 bt_put(lttng_live->no_stream_port);
7cdc2bab 1022 assert(!ret);
7cdc2bab
MD
1023 }
1024 if (lttng_live->no_stream_iter) {
1025 g_free(lttng_live->no_stream_iter);
1026 }
1027 g_free(lttng_live);
1028}
1029
1030BT_HIDDEN
1031void lttng_live_component_finalize(struct bt_private_component *component)
1032{
1033 void *data = bt_private_component_get_user_data(component);
1034
1035 if (!data) {
1036 return;
1037 }
1038 lttng_live_component_destroy_data(data);
1039}
1040
1041static
1042struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
6f79a7cf 1043 struct bt_private_component *private_component)
7cdc2bab
MD
1044{
1045 struct lttng_live_component *lttng_live;
1046 struct bt_value *value = NULL;
1047 const char *url;
1048 enum bt_value_status ret;
1049
1050 lttng_live = g_new0(struct lttng_live_component, 1);
1051 if (!lttng_live) {
1052 goto end;
1053 }
7cdc2bab
MD
1054 /* TODO: make this an overridable parameter. */
1055 lttng_live->max_query_size = MAX_QUERY_SIZE;
1056 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1057 value = bt_value_map_get(params, "url");
1058 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1059 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1060 goto error;
1061 }
1062 ret = bt_value_string_get(value, &url);
1063 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1064 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1065 goto error;
1066 }
1067 lttng_live->url = g_string_new(url);
1068 if (!lttng_live->url) {
1069 goto error;
1070 }
6f79a7cf 1071 BT_PUT(value);
7cdc2bab 1072 lttng_live->viewer_connection =
4c66436f 1073 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1074 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1075 goto error;
1076 }
1077 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1078 goto error;
1079 }
1080 lttng_live->private_component = private_component;
4c66436f 1081
7cdc2bab
MD
1082 goto end;
1083
1084error:
1085 lttng_live_component_destroy_data(lttng_live);
1086 lttng_live = NULL;
1087end:
1088 return lttng_live;
d3e4dcd8
PP
1089}
1090
f3bc2010 1091BT_HIDDEN
3cdf4234
MD
1092enum bt_component_status lttng_live_component_init(
1093 struct bt_private_component *private_component,
7cdc2bab 1094 struct bt_value *params, void *init_method_data)
f3bc2010 1095{
7cdc2bab
MD
1096 struct lttng_live_component *lttng_live;
1097 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1098
7cdc2bab 1099 /* Passes ownership of iter ref to lttng_live_component_create. */
6f79a7cf 1100 lttng_live = lttng_live_component_create(params, private_component);
7cdc2bab 1101 if (!lttng_live) {
6f79a7cf
MD
1102 //TODO : we need access to the application cancel state
1103 //because we are not part of a graph yet.
1104 ret = BT_COMPONENT_STATUS_NOMEM;
7cdc2bab
MD
1105 goto end;
1106 }
1107
1108 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1109 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1110 lttng_live->no_stream_iter->lttng_live = lttng_live;
147337a3 1111 ret = bt_private_component_source_add_output_private_port(
7cdc2bab 1112 lttng_live->private_component, "no-stream",
147337a3
PP
1113 lttng_live->no_stream_iter,
1114 &lttng_live->no_stream_port);
1115 if (ret != BT_COMPONENT_STATUS_OK) {
1116 goto end;
1117 }
6f79a7cf 1118 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
1119 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1120
3cdf4234 1121 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1122 if (ret != BT_COMPONENT_STATUS_OK) {
1123 goto error;
1124 }
1125
1126end:
1127 return ret;
1128error:
3cdf4234 1129 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1130 lttng_live_component_destroy_data(lttng_live);
1131 return ret;
f3bc2010 1132}
087bc060 1133
d85ef162
MD
1134BT_HIDDEN
1135enum bt_component_status lttng_live_accept_port_connection(
1136 struct bt_private_component *private_component,
1137 struct bt_private_port *self_private_port,
1138 struct bt_port *other_port)
1139{
1140 struct lttng_live_component *lttng_live =
1141 bt_private_component_get_user_data(private_component);
1142 struct bt_component *other_component;
1143 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1144 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1145
1146 other_component = bt_port_get_component(other_port);
1147 bt_put(other_component); /* weak */
1148
1149 if (!lttng_live->downstream_component) {
1150 lttng_live->downstream_component = other_component;
1151 goto end;
1152 }
1153
1154 /*
1155 * Compare prior component to ensure we are connected to the
1156 * same downstream component as prior ports.
1157 */
1158 if (lttng_live->downstream_component != other_component) {
1159 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1160 bt_port_get_name(self_port),
1161 bt_component_get_name(other_component),
1162 bt_component_get_name(lttng_live->downstream_component));
1163 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1164 goto end;
1165 }
1166end:
1167 bt_put(self_port);
1168 return status;
1169}
This page took 0.079507 seconds and 4 git commands to generate.