Tests: erroneous usage of if preprocessor directive
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab 47#include <babeltrace/compiler-internal.h>
6f79a7cf 48#include <babeltrace/types.h>
7cdc2bab
MD
49#include <inttypes.h>
50#include <glib.h>
51#include <assert.h>
52#include <unistd.h>
7d61fa8e 53#include <plugins-common.h>
f3bc2010 54
7cdc2bab
MD
55#include "data-stream.h"
56#include "metadata.h"
0f5e83e5 57#include "lttng-live-internal.h"
7cdc2bab 58
7cdc2bab 59#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 60
087bc060 61#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
62
63static const char *print_state(struct lttng_live_stream_iterator *s)
64{
65 switch (s->state) {
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT:
71 return "QUIESCENT";
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
73 return "ACTIVE_DATA";
74 case LTTNG_LIVE_STREAM_EOF:
75 return "EOF";
76 default:
77 return "ERROR";
78 }
79}
7cdc2bab 80
6f79a7cf
MD
81static
82void print_stream_state(struct lttng_live_stream_iterator *stream)
83{
84 struct bt_port *port;
85
86 port = bt_port_from_private_port(stream->port);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
88 bt_port_get_name(port),
89 print_state(stream),
90 stream->last_returned_inactivity_timestamp,
91 stream->current_inactivity_timestamp);
92 bt_put(port);
93}
94
95BT_HIDDEN
96bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
97{
98 struct bt_component *component;
99 struct bt_graph *graph;
100 bt_bool ret;
101
102 if (!lttng_live) {
103 return BT_FALSE;
104 }
105
106 component = bt_component_from_private_component(lttng_live->private_component);
107 graph = bt_component_get_graph(component);
108 ret = bt_graph_is_canceled(graph);
109 bt_put(graph);
110 bt_put(component);
111 return ret;
112}
7cdc2bab 113
7cdc2bab
MD
114BT_HIDDEN
115int lttng_live_add_port(struct lttng_live_component *lttng_live,
116 struct lttng_live_stream_iterator *stream_iter)
117{
118 int ret;
119 struct bt_private_port *private_port;
120 char name[STREAM_NAME_MAX_LEN];
6f79a7cf 121 enum bt_component_status status;
7cdc2bab
MD
122
123 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
124 assert(ret > 0);
125 strcpy(stream_iter->name, name);
4bf0e537
MD
126 if (lttng_live_is_canceled(lttng_live)) {
127 return 0;
128 }
6f79a7cf 129 status = bt_private_component_source_add_output_private_port(
147337a3
PP
130 lttng_live->private_component, name, stream_iter,
131 &private_port);
6f79a7cf
MD
132 switch (status) {
133 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
134 return 0;
135 case BT_COMPONENT_STATUS_OK:
136 break;
137 default:
7cdc2bab
MD
138 return -1;
139 }
6f79a7cf 140 bt_put(private_port); /* weak */
087bc060 141 BT_LOGI("Added port %s", name);
7cdc2bab
MD
142
143 if (lttng_live->no_stream_port) {
6f79a7cf 144 bt_get(lttng_live->no_stream_port);
7cdc2bab 145 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 146 bt_put(lttng_live->no_stream_port);
7cdc2bab
MD
147 if (ret) {
148 return -1;
149 }
6f79a7cf 150 lttng_live->no_stream_port = NULL;
7cdc2bab
MD
151 lttng_live->no_stream_iter->port = NULL;
152 }
153 stream_iter->port = private_port;
154 return 0;
155}
156
157BT_HIDDEN
158int lttng_live_remove_port(struct lttng_live_component *lttng_live,
159 struct bt_private_port *port)
160{
161 struct bt_component *component;
162 int64_t nr_ports;
163 int ret;
164
165 component = bt_component_from_private_component(lttng_live->private_component);
166 nr_ports = bt_component_source_get_output_port_count(component);
167 if (nr_ports < 0) {
168 return -1;
169 }
170 BT_PUT(component);
171 if (nr_ports == 1) {
6f79a7cf
MD
172 enum bt_component_status status;
173
7cdc2bab 174 assert(!lttng_live->no_stream_port);
4bf0e537
MD
175
176 if (lttng_live_is_canceled(lttng_live)) {
177 return 0;
178 }
6f79a7cf 179 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
147337a3
PP
180 "no-stream", lttng_live->no_stream_iter,
181 &lttng_live->no_stream_port);
6f79a7cf
MD
182 switch (status) {
183 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
184 return 0;
185 case BT_COMPONENT_STATUS_OK:
186 break;
187 default:
7cdc2bab
MD
188 return -1;
189 }
6f79a7cf 190 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
191 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
192 }
6f79a7cf 193 bt_get(port);
7cdc2bab 194 ret = bt_private_port_remove_from_component(port);
6f79a7cf 195 bt_put(port);
7cdc2bab
MD
196 if (ret) {
197 return -1;
198 }
199 return 0;
200}
201
202static
203struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
204 uint64_t trace_id)
d3e4dcd8 205{
7cdc2bab
MD
206 struct lttng_live_trace *trace;
207
208 bt_list_for_each_entry(trace, &session->traces, node) {
209 if (trace->id == trace_id) {
210 return trace;
211 }
212 }
d3eb6e8f
PP
213 return NULL;
214}
215
7cdc2bab
MD
216static
217void lttng_live_destroy_trace(struct bt_object *obj)
218{
219 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
220
087bc060 221 BT_LOGI("Destroy trace");
7cdc2bab
MD
222 assert(bt_list_empty(&trace->streams));
223 bt_list_del(&trace->node);
5bd230f4 224
4bf0e537
MD
225 if (trace->trace) {
226 int retval;
5bd230f4 227
4bf0e537
MD
228 retval = bt_ctf_trace_set_is_static(trace->trace);
229 assert(!retval);
230 BT_PUT(trace->trace);
231 }
7cdc2bab
MD
232 lttng_live_metadata_fini(trace);
233 BT_PUT(trace->cc_prio_map);
234 g_free(trace);
235}
236
237static
238struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
239 uint64_t trace_id)
240{
241 struct lttng_live_trace *trace = NULL;
242
243 trace = g_new0(struct lttng_live_trace, 1);
244 if (!trace) {
245 goto error;
246 }
247 trace->session = session;
248 trace->id = trace_id;
249 BT_INIT_LIST_HEAD(&trace->streams);
250 trace->new_metadata_needed = true;
251 bt_list_add(&trace->node, &session->traces);
252 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 253 BT_LOGI("Create trace");
7cdc2bab
MD
254 goto end;
255error:
256 g_free(trace);
257 trace = NULL;
258end:
259 return trace;
260}
261
262BT_HIDDEN
263struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
264 uint64_t trace_id)
265{
266 struct lttng_live_trace *trace;
267
268 trace = lttng_live_find_trace(session, trace_id);
269 if (trace) {
270 bt_get(trace);
271 return trace;
272 }
273 return lttng_live_create_trace(session, trace_id);
274}
275
276BT_HIDDEN
277void lttng_live_unref_trace(struct lttng_live_trace *trace)
278{
279 bt_put(trace);
280}
281
282static
283void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
284{
285 struct lttng_live_stream_iterator *stream, *s;
286
287 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
288 lttng_live_stream_iterator_destroy(stream);
289 }
290 lttng_live_metadata_fini(trace);
291}
292
293BT_HIDDEN
06994c71
MD
294int lttng_live_add_session(struct lttng_live_component *lttng_live,
295 uint64_t session_id, const char *hostname,
296 const char *session_name)
7cdc2bab
MD
297{
298 int ret = 0;
299 struct lttng_live_session *s;
300
301 s = g_new0(struct lttng_live_session, 1);
302 if (!s) {
303 goto error;
304 }
305
306 s->id = session_id;
307 BT_INIT_LIST_HEAD(&s->traces);
308 s->lttng_live = lttng_live;
309 s->new_streams_needed = true;
06994c71
MD
310 s->hostname = g_string_new(hostname);
311 s->session_name = g_string_new(session_name);
7cdc2bab 312
06994c71
MD
313 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
314 s->id, hostname, session_name);
7cdc2bab
MD
315 bt_list_add(&s->node, &lttng_live->sessions);
316 goto end;
317error:
087bc060 318 BT_LOGE("Error adding session");
7cdc2bab
MD
319 g_free(s);
320 ret = -1;
321end:
322 return ret;
323}
324
325static
326void lttng_live_destroy_session(struct lttng_live_session *session)
327{
328 struct lttng_live_trace *trace, *t;
329
087bc060 330 BT_LOGI("Destroy session");
7cdc2bab
MD
331 if (session->id != -1ULL) {
332 if (lttng_live_detach_session(session)) {
6f79a7cf 333 if (!lttng_live_is_canceled(session->lttng_live)) {
4c66436f
MD
334 /* Old relayd cannot detach sessions. */
335 BT_LOGD("Unable to detach session %" PRIu64,
336 session->id);
337 }
7cdc2bab
MD
338 }
339 session->id = -1ULL;
340 }
341 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
342 lttng_live_close_trace_streams(trace);
343 }
344 bt_list_del(&session->node);
06994c71
MD
345 if (session->hostname) {
346 g_string_free(session->hostname, TRUE);
347 }
348 if (session->session_name) {
349 g_string_free(session->session_name, TRUE);
350 }
7cdc2bab
MD
351 g_free(session);
352}
353
354BT_HIDDEN
355void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
356{
357 struct lttng_live_stream_iterator_generic *s =
358 bt_private_notification_iterator_get_user_data(it);
359
360 switch (s->type) {
361 case LIVE_STREAM_TYPE_NO_STREAM:
362 {
363 /* Leave no_stream_iter in place when port is removed. */
364 break;
365 }
366 case LIVE_STREAM_TYPE_STREAM:
367 {
368 struct lttng_live_stream_iterator *stream_iter =
369 container_of(s, struct lttng_live_stream_iterator, p);
370
371 lttng_live_stream_iterator_destroy(stream_iter);
372 break;
373 }
374 }
375}
376
377static
378enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
379 struct lttng_live_component *lttng_live,
380 struct lttng_live_stream_iterator *lttng_live_stream)
381{
382 switch (lttng_live_stream->state) {
383 case LTTNG_LIVE_STREAM_QUIESCENT:
384 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
385 break;
386 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
387 /* Invalid state. */
087bc060
MD
388 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
389 abort();
7cdc2bab
MD
390 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
391 /* Invalid state. */
087bc060
MD
392 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
393 abort();
7cdc2bab
MD
394 case LTTNG_LIVE_STREAM_EOF:
395 break;
396 }
397 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
398}
399
400/*
401 * For active no data stream, fetch next data. It can be either:
402 * - quiescent: need to put it in the prio heap at quiescent end
403 * timestamp,
404 * - have data: need to wire up first event into the prio heap,
405 * - have no data on this stream at this point: need to retry (AGAIN) or
406 * return EOF.
407 */
408static
409enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
410 struct lttng_live_component *lttng_live,
411 struct lttng_live_stream_iterator *lttng_live_stream)
412{
413 enum bt_ctf_lttng_live_iterator_status ret =
414 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
415 struct packet_index index;
416 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
417
418 if (lttng_live_stream->trace->new_metadata_needed) {
419 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 goto end;
421 }
422 if (lttng_live_stream->trace->session->new_streams_needed) {
423 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
424 goto end;
425 }
426 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
427 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
428 goto end;
429 }
430 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
431 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
432 goto end;
433 }
434 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
435 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
436 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
437 && lttng_live_stream->last_returned_inactivity_timestamp ==
438 lttng_live_stream->current_inactivity_timestamp) {
439 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 print_stream_state(lttng_live_stream);
441 } else {
442 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
443 }
444 goto end;
445 }
446 lttng_live_stream->base_offset = index.offset;
447 lttng_live_stream->offset = index.offset;
448 lttng_live_stream->len = index.packet_size / CHAR_BIT;
449end:
450 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
451 ret = lttng_live_iterator_next_check_stream_state(
452 lttng_live, lttng_live_stream);
453 }
454 return ret;
455}
456
457/*
458 * Creation of the notification requires the ctf trace to be created
459 * beforehand, but the live protocol gives us all streams (including
460 * metadata) at once. So we split it in three steps: getting streams,
461 * getting metadata (which creates the ctf trace), and then creating the
462 * per-stream notifications.
463 */
464static
465enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
466 struct lttng_live_component *lttng_live,
467 struct lttng_live_session *session)
468{
469 enum bt_ctf_lttng_live_iterator_status status;
470 struct lttng_live_trace *trace, *t;
471
472 if (lttng_live_attach_session(session)) {
6f79a7cf 473 if (lttng_live_is_canceled(lttng_live)) {
4c66436f
MD
474 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
475 } else {
476 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
477 }
7cdc2bab
MD
478 }
479 status = lttng_live_get_new_streams(session);
480 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
481 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
482 return status;
483 }
484 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
485 status = lttng_live_metadata_update(trace);
5bd230f4
MD
486 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
487 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
488 return status;
489 }
490 }
491 return lttng_live_lazy_notif_init(session);
492}
493
494BT_HIDDEN
495void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
496{
497 struct lttng_live_session *session;
498
499 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
500 session->new_streams_needed = true;
501 }
502}
503
504static
505void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
506{
507 struct lttng_live_session *session;
508
509 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
510 struct lttng_live_trace *trace;
511
512 session->new_streams_needed = true;
513 bt_list_for_each_entry(trace, &session->traces, node) {
514 trace->new_metadata_needed = true;
515 }
516 }
517}
518
519static
3cdf4234 520enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
521 struct lttng_live_component *lttng_live)
522{
523 enum bt_ctf_lttng_live_iterator_status ret =
524 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
525 unsigned int nr_sessions_opened = 0;
526 struct lttng_live_session *session, *s;
527
528 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
529 if (session->closed && bt_list_empty(&session->traces)) {
530 lttng_live_destroy_session(session);
531 }
532 }
533 /*
534 * Currently, when there are no sessions, we quit immediately.
535 * We may want to add a component parameter to keep trying until
536 * we get data in the future.
537 * Also, in a remotely distant future, we could add a "new
538 * session" flag to the protocol, which would tell us that we
539 * need to query for new sessions even though we have sessions
540 * currently ongoing.
541 */
542 if (bt_list_empty(&lttng_live->sessions)) {
543 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
544 goto end;
545 }
546 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
547 ret = lttng_live_get_session(lttng_live, session);
548 switch (ret) {
549 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
550 break;
551 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
552 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
553 break;
554 default:
555 goto end;
556 }
557 if (!session->closed) {
558 nr_sessions_opened++;
559 }
560 }
561end:
562 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
563 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
564 }
565 return ret;
566}
567
568static
569enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
570 struct lttng_live_component *lttng_live,
571 struct lttng_live_stream_iterator *lttng_live_stream,
572 struct bt_notification **notification,
573 uint64_t timestamp)
574{
575 enum bt_ctf_lttng_live_iterator_status ret =
576 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
577 struct lttng_live_trace *trace;
578 struct bt_ctf_clock_class *clock_class = NULL;
579 struct bt_ctf_clock_value *clock_value = NULL;
580 struct bt_notification *notif = NULL;
581 int retval;
582
583 trace = lttng_live_stream->trace;
584 if (!trace) {
585 goto error;
586 }
587 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
588 if (!clock_class) {
589 goto error;
590 }
591 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
592 if (!clock_value) {
593 goto error;
594 }
595 notif = bt_notification_inactivity_create(trace->cc_prio_map);
596 if (!notif) {
597 goto error;
598 }
599 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
600 if (retval) {
601 goto error;
602 }
603 *notification = notif;
604end:
605 bt_put(clock_value);
606 bt_put(clock_class);
607 return ret;
608
609error:
610 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
611 bt_put(notif);
612 goto end;
613}
614
615static
616enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
617 struct lttng_live_component *lttng_live,
618 struct lttng_live_stream_iterator *lttng_live_stream,
619 struct bt_notification **notification)
620{
621 enum bt_ctf_lttng_live_iterator_status ret =
622 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
623 struct bt_ctf_clock_class *clock_class = NULL;
624 struct bt_ctf_clock_value *clock_value = NULL;
625
626 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
627 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
628 }
629
630 if (lttng_live_stream->current_inactivity_timestamp ==
631 lttng_live_stream->last_returned_inactivity_timestamp) {
632 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
634 goto end;
635 }
636
637 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
638 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
639
640 lttng_live_stream->last_returned_inactivity_timestamp =
641 lttng_live_stream->current_inactivity_timestamp;
642end:
643 bt_put(clock_value);
644 bt_put(clock_class);
645 return ret;
646}
647
648static
649enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
650 struct lttng_live_component *lttng_live,
651 struct lttng_live_stream_iterator *lttng_live_stream,
652 struct bt_notification **notification)
653{
654 enum bt_ctf_lttng_live_iterator_status ret =
655 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
656 enum bt_ctf_notif_iter_status status;
657 struct lttng_live_session *session;
658
659 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
660 struct lttng_live_trace *trace;
661
662 if (session->new_streams_needed) {
663 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
664 }
665 bt_list_for_each_entry(trace, &session->traces, node) {
666 if (trace->new_metadata_needed) {
667 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
668 }
669 }
670 }
671
672 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
673 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
674 }
675 if (lttng_live_stream->packet_end_notif_queue) {
676 *notification = lttng_live_stream->packet_end_notif_queue;
677 lttng_live_stream->packet_end_notif_queue = NULL;
678 status = BT_CTF_NOTIF_ITER_STATUS_OK;
679 } else {
680 status = bt_ctf_notif_iter_get_next_notification(
681 lttng_live_stream->notif_iter,
682 lttng_live_stream->trace->cc_prio_map,
683 notification);
684 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
685 /*
686 * Consider empty packets as inactivity.
687 */
688 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
689 lttng_live_stream->packet_end_notif_queue = *notification;
690 *notification = NULL;
691 return emit_inactivity_notification(lttng_live,
692 lttng_live_stream, notification,
693 lttng_live_stream->current_packet_end_timestamp);
694 }
695 }
696 }
697 switch (status) {
698 case BT_CTF_NOTIF_ITER_STATUS_EOF:
699 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
700 break;
701 case BT_CTF_NOTIF_ITER_STATUS_OK:
702 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
703 break;
704 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
705 /*
706 * Continue immediately (end of packet). The next
707 * get_index may return AGAIN to delay the following
708 * attempt.
709 */
710 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
711 break;
712 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
713 /* No argument provided by the user, so don't return INVAL. */
714 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
715 default:
716 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
717 break;
718 }
719 return ret;
720}
721
722/*
723 * helper function:
724 * handle_no_data_streams()
725 * retry:
726 * - for each ACTIVE_NO_DATA stream:
727 * - query relayd for stream data, or quiescence info.
728 * - if need metadata, get metadata, goto retry.
729 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
730 * - if quiescent, move to QUIESCENT streams
731 * - if fetched data, move to ACTIVE_DATA streams
732 * (at this point each stream either has data, or is quiescent)
733 *
734 *
735 * iterator_next:
736 * handle_new_streams_and_metadata()
737 * - query relayd for known streams, add them as ACTIVE_NO_DATA
738 * - query relayd for metadata
739 *
740 * call handle_active_no_data_streams()
741 *
742 * handle_quiescent_streams()
743 * - if at least one stream is ACTIVE_DATA:
744 * - peek stream event with lowest timestamp -> next_ts
745 * - for each quiescent stream
746 * - if next_ts >= quiescent end
747 * - set state to ACTIVE_NO_DATA
748 * - else
749 * - for each quiescent stream
750 * - set state to ACTIVE_NO_DATA
751 *
752 * call handle_active_no_data_streams()
753 *
754 * handle_active_data_streams()
755 * - if at least one stream is ACTIVE_DATA:
756 * - get stream event with lowest timestamp from heap
757 * - make that stream event the current notification.
758 * - move this stream heap position to its next event
759 * - if we need to fetch data from relayd, move
760 * stream to ACTIVE_NO_DATA.
761 * - return OK
762 * - return AGAIN
763 *
764 * end criterion: ctrl-c on client. If relayd exits or the session
765 * closes on the relay daemon side, we keep on waiting for streams.
766 * Eventually handle --end timestamp (also an end criterion).
767 *
768 * When disconnected from relayd: try to re-connect endlessly.
769 */
770static
771struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
772 struct bt_private_notification_iterator *iterator,
773 struct lttng_live_stream_iterator *stream_iter)
774{
775 enum bt_ctf_lttng_live_iterator_status status;
776 struct bt_notification_iterator_next_return next_return;
777 struct lttng_live_component *lttng_live;
778
779 lttng_live = stream_iter->trace->session->lttng_live;
780retry:
781 print_stream_state(stream_iter);
782 next_return.notification = NULL;
783 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
784 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
785 goto end;
786 }
787 status = lttng_live_iterator_next_handle_one_no_data_stream(
788 lttng_live, stream_iter);
789 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
790 goto end;
791 }
792 status = lttng_live_iterator_next_handle_one_quiescent_stream(
793 lttng_live, stream_iter, &next_return.notification);
794 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
795 assert(next_return.notification == NULL);
796 goto end;
797 }
798 if (next_return.notification) {
799 goto end;
800 }
801 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
802 stream_iter, &next_return.notification);
803 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
804 assert(next_return.notification == NULL);
805 }
806
807end:
808 switch (status) {
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
810 print_dbg("continue");
811 goto retry;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
814 print_dbg("again");
815 break;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
817 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
818 print_dbg("end");
819 break;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
821 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
822 print_dbg("ok");
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
825 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
826 break;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
828 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
829 break;
830 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
831 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
832 break;
833 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
834 default: /* fall-through */
835 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
836 break;
837 }
838 return next_return;
839}
840
841static
842struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
843 struct bt_private_notification_iterator *iterator,
844 struct lttng_live_no_stream_iterator *no_stream_iter)
845{
846 enum bt_ctf_lttng_live_iterator_status status;
847 struct bt_notification_iterator_next_return next_return;
848 struct lttng_live_component *lttng_live;
849
850 lttng_live = no_stream_iter->lttng_live;
851retry:
852 lttng_live_force_new_streams_and_metadata(lttng_live);
4513a12e 853 next_return.notification = NULL;
7cdc2bab
MD
854 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
855 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
856 goto end;
857 }
858 if (no_stream_iter->port) {
859 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
860 } else {
861 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
862 }
863end:
864 switch (status) {
865 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
866 goto retry;
867 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
868 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
869 break;
870 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
871 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
872 break;
7cdc2bab
MD
873 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
874 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
875 break;
876 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
877 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
878 break;
879 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
880 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
881 break;
882 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
883 default: /* fall-through */
884 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
885 break;
886 }
887 return next_return;
888}
889
d3eb6e8f 890BT_HIDDEN
41a2b7ae 891struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 892 struct bt_private_notification_iterator *iterator)
d3eb6e8f 893{
7cdc2bab
MD
894 struct lttng_live_stream_iterator_generic *s =
895 bt_private_notification_iterator_get_user_data(iterator);
896 struct bt_notification_iterator_next_return next_return;
897
898 switch (s->type) {
899 case LIVE_STREAM_TYPE_NO_STREAM:
900 next_return = lttng_live_iterator_next_no_stream(iterator,
901 container_of(s, struct lttng_live_no_stream_iterator, p));
902 break;
903 case LIVE_STREAM_TYPE_STREAM:
904 next_return = lttng_live_iterator_next_stream(iterator,
905 container_of(s, struct lttng_live_stream_iterator, p));
906 break;
907 default:
908 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
909 break;
910 }
911 return next_return;
912}
41a2b7ae 913
7cdc2bab
MD
914BT_HIDDEN
915enum bt_notification_iterator_status lttng_live_iterator_init(
916 struct bt_private_notification_iterator *it,
917 struct bt_private_port *port)
918{
919 enum bt_notification_iterator_status ret =
920 BT_NOTIFICATION_ITERATOR_STATUS_OK;
921 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
922
923 assert(it);
924
925 s = bt_private_port_get_user_data(port);
926 assert(s);
927 switch (s->type) {
928 case LIVE_STREAM_TYPE_NO_STREAM:
929 {
930 struct lttng_live_no_stream_iterator *no_stream_iter =
931 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
932 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
933 if (ret) {
934 goto error;
935 }
936 break;
937 }
938 case LIVE_STREAM_TYPE_STREAM:
939 {
940 struct lttng_live_stream_iterator *stream_iter =
941 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
942 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
943 if (ret) {
944 goto error;
945 }
946 break;
947 }
948 default:
949 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
950 goto end;
951 }
952
953end:
41a2b7ae 954 return ret;
7cdc2bab
MD
955error:
956 if (bt_private_notification_iterator_set_user_data(it, NULL)
957 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 958 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
959 }
960 goto end;
961}
962
963static
964struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
965 struct bt_value *params)
966{
967 struct bt_value *url_value = NULL;
968 struct bt_value *results = NULL;
969 const char *url;
970 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
971
972 url_value = bt_value_map_get(params, "url");
973 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 974 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
975 goto error;
976 }
977
3cdf4234 978 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 979 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
980 goto error;
981 }
982
4c66436f 983 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 984 if (!viewer_connection) {
7cdc2bab
MD
985 goto error;
986 }
987
988 results = bt_live_viewer_connection_list_sessions(viewer_connection);
989 goto end;
990error:
991 BT_PUT(results);
992end:
993 if (viewer_connection) {
994 bt_live_viewer_connection_destroy(viewer_connection);
995 }
996 BT_PUT(url_value);
997 return results;
998}
999
1000BT_HIDDEN
1001struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
1002 const char *object, struct bt_value *params)
1003{
1004 if (strcmp(object, "sessions") == 0) {
1005 return lttng_live_query_list_sessions(comp_class,
1006 params);
1007 }
087bc060 1008 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
1009 return NULL;
1010}
1011
1012static
1013void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1014{
1015 int ret;
1016 struct lttng_live_session *session, *s;
1017
1018 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1019 lttng_live_destroy_session(session);
1020 }
1021 BT_PUT(lttng_live->viewer_connection);
1022 if (lttng_live->url) {
1023 g_string_free(lttng_live->url, TRUE);
1024 }
1025 if (lttng_live->no_stream_port) {
6f79a7cf 1026 bt_get(lttng_live->no_stream_port);
7cdc2bab 1027 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 1028 bt_put(lttng_live->no_stream_port);
7cdc2bab 1029 assert(!ret);
7cdc2bab
MD
1030 }
1031 if (lttng_live->no_stream_iter) {
1032 g_free(lttng_live->no_stream_iter);
1033 }
1034 g_free(lttng_live);
1035}
1036
1037BT_HIDDEN
1038void lttng_live_component_finalize(struct bt_private_component *component)
1039{
1040 void *data = bt_private_component_get_user_data(component);
1041
1042 if (!data) {
1043 return;
1044 }
1045 lttng_live_component_destroy_data(data);
1046}
1047
1048static
1049struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
6f79a7cf 1050 struct bt_private_component *private_component)
7cdc2bab
MD
1051{
1052 struct lttng_live_component *lttng_live;
1053 struct bt_value *value = NULL;
1054 const char *url;
1055 enum bt_value_status ret;
1056
1057 lttng_live = g_new0(struct lttng_live_component, 1);
1058 if (!lttng_live) {
1059 goto end;
1060 }
7cdc2bab
MD
1061 /* TODO: make this an overridable parameter. */
1062 lttng_live->max_query_size = MAX_QUERY_SIZE;
1063 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1064 value = bt_value_map_get(params, "url");
1065 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1066 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1067 goto error;
1068 }
1069 ret = bt_value_string_get(value, &url);
1070 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1071 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1072 goto error;
1073 }
1074 lttng_live->url = g_string_new(url);
1075 if (!lttng_live->url) {
1076 goto error;
1077 }
6f79a7cf 1078 BT_PUT(value);
7cdc2bab 1079 lttng_live->viewer_connection =
4c66436f 1080 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1081 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1082 goto error;
1083 }
1084 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1085 goto error;
1086 }
1087 lttng_live->private_component = private_component;
4c66436f 1088
7cdc2bab
MD
1089 goto end;
1090
1091error:
1092 lttng_live_component_destroy_data(lttng_live);
1093 lttng_live = NULL;
1094end:
1095 return lttng_live;
d3e4dcd8
PP
1096}
1097
f3bc2010 1098BT_HIDDEN
3cdf4234
MD
1099enum bt_component_status lttng_live_component_init(
1100 struct bt_private_component *private_component,
7cdc2bab 1101 struct bt_value *params, void *init_method_data)
f3bc2010 1102{
7cdc2bab
MD
1103 struct lttng_live_component *lttng_live;
1104 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1105
7cdc2bab 1106 /* Passes ownership of iter ref to lttng_live_component_create. */
6f79a7cf 1107 lttng_live = lttng_live_component_create(params, private_component);
7cdc2bab 1108 if (!lttng_live) {
6f79a7cf
MD
1109 //TODO : we need access to the application cancel state
1110 //because we are not part of a graph yet.
1111 ret = BT_COMPONENT_STATUS_NOMEM;
7cdc2bab
MD
1112 goto end;
1113 }
1114
1115 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1116 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1117 lttng_live->no_stream_iter->lttng_live = lttng_live;
4bf0e537
MD
1118 if (lttng_live_is_canceled(lttng_live)) {
1119 goto end;
1120 }
147337a3 1121 ret = bt_private_component_source_add_output_private_port(
7cdc2bab 1122 lttng_live->private_component, "no-stream",
147337a3
PP
1123 lttng_live->no_stream_iter,
1124 &lttng_live->no_stream_port);
1125 if (ret != BT_COMPONENT_STATUS_OK) {
1126 goto end;
1127 }
6f79a7cf 1128 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
1129 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1130
3cdf4234 1131 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1132 if (ret != BT_COMPONENT_STATUS_OK) {
1133 goto error;
1134 }
1135
1136end:
1137 return ret;
1138error:
3cdf4234 1139 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1140 lttng_live_component_destroy_data(lttng_live);
1141 return ret;
f3bc2010 1142}
087bc060 1143
d85ef162
MD
1144BT_HIDDEN
1145enum bt_component_status lttng_live_accept_port_connection(
1146 struct bt_private_component *private_component,
1147 struct bt_private_port *self_private_port,
1148 struct bt_port *other_port)
1149{
1150 struct lttng_live_component *lttng_live =
1151 bt_private_component_get_user_data(private_component);
1152 struct bt_component *other_component;
1153 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1154 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1155
1156 other_component = bt_port_get_component(other_port);
1157 bt_put(other_component); /* weak */
1158
1159 if (!lttng_live->downstream_component) {
1160 lttng_live->downstream_component = other_component;
1161 goto end;
1162 }
1163
1164 /*
1165 * Compare prior component to ensure we are connected to the
1166 * same downstream component as prior ports.
1167 */
1168 if (lttng_live->downstream_component != other_component) {
1169 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1170 bt_port_get_name(self_port),
1171 bt_component_get_name(other_component),
1172 bt_component_get_name(lttng_live->downstream_component));
1173 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1174 goto end;
1175 }
1176end:
1177 bt_put(self_port);
1178 return status;
1179}
This page took 0.084692 seconds and 4 git commands to generate.