Implement ctf.lttng-live component
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
7cdc2bab 30#include <babeltrace/ctf-ir/packet.h>
b2e0c907 31#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
32#include <babeltrace/graph/private-port.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component.h>
35#include <babeltrace/graph/private-component-source.h>
36#include <babeltrace/graph/private-notification-iterator.h>
37#include <babeltrace/graph/notification-stream.h>
38#include <babeltrace/graph/notification-packet.h>
39#include <babeltrace/graph/notification-event.h>
40#include <babeltrace/graph/notification-heap.h>
41#include <babeltrace/graph/notification-iterator.h>
42#include <babeltrace/graph/notification-inactivity.h>
43#include <babeltrace/compiler-internal.h>
44#include <inttypes.h>
45#include <glib.h>
46#include <assert.h>
47#include <unistd.h>
7d61fa8e 48#include <plugins-common.h>
f3bc2010 49
7cdc2bab
MD
50#include "lttng-live-internal.h"
51#include "data-stream.h"
52#include "metadata.h"
53
54#define PRINT_ERR_STREAM (lttng_live->error_fp)
55#define PRINT_PREFIX "lttng-live"
56#define PRINT_DBG_CHECK lttng_live_debug
57#define MAX_QUERY_SIZE (256*1024)
58#include "../print.h"
59
60#ifdef LIVE_DEBUG
61#define print_dbg(fmt, args...) \
62 fprintf(stderr, "%s() at " __FILE__ ":%d " fmt "\n", \
63 __func__, __LINE__, ## args)
64
65static const char *print_state(struct lttng_live_stream_iterator *s)
66{
67 switch (s->state) {
68 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
69 return "ACTIVE_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
71 return "QUIESCENT_NO_DATA";
72 case LTTNG_LIVE_STREAM_QUIESCENT:
73 return "QUIESCENT";
74 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
75 return "ACTIVE_DATA";
76 case LTTNG_LIVE_STREAM_EOF:
77 return "EOF";
78 default:
79 return "ERROR";
80 }
81}
82#else
83#define print_dbg(fmt, args...)
84#endif
85
86#define print_stream_state(stream) \
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
88 bt_port_get_name(bt_port_from_private_port(stream->port)), \
89 print_state(stream), stream->last_returned_inactivity_timestamp, \
90 stream->current_inactivity_timestamp)
91
d3e4dcd8 92BT_HIDDEN
7cdc2bab
MD
93bool lttng_live_debug;
94
95BT_HIDDEN
96int lttng_live_add_port(struct lttng_live_component *lttng_live,
97 struct lttng_live_stream_iterator *stream_iter)
98{
99 int ret;
100 struct bt_private_port *private_port;
101 char name[STREAM_NAME_MAX_LEN];
102
103 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
104 assert(ret > 0);
105 strcpy(stream_iter->name, name);
106 private_port = bt_private_component_source_add_output_private_port(
107 lttng_live->private_component, name, stream_iter);
108 if (!private_port) {
109 return -1;
110 }
111 PDBG("Added port %s\n", name);
112
113 if (lttng_live->no_stream_port) {
114 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
115 if (ret) {
116 return -1;
117 }
118 BT_PUT(lttng_live->no_stream_port);
119 lttng_live->no_stream_iter->port = NULL;
120 }
121 stream_iter->port = private_port;
122 return 0;
123}
124
125BT_HIDDEN
126int lttng_live_remove_port(struct lttng_live_component *lttng_live,
127 struct bt_private_port *port)
128{
129 struct bt_component *component;
130 int64_t nr_ports;
131 int ret;
132
133 component = bt_component_from_private_component(lttng_live->private_component);
134 nr_ports = bt_component_source_get_output_port_count(component);
135 if (nr_ports < 0) {
136 return -1;
137 }
138 BT_PUT(component);
139 if (nr_ports == 1) {
140 assert(!lttng_live->no_stream_port);
141 lttng_live->no_stream_port =
142 bt_private_component_source_add_output_private_port(lttng_live->private_component,
143 "no-stream", lttng_live->no_stream_iter);
144 if (!lttng_live->no_stream_port) {
145 return -1;
146 }
147 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
148 }
149 ret = bt_private_port_remove_from_component(port);
150 if (ret) {
151 return -1;
152 }
153 return 0;
154}
155
156static
157struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
158 uint64_t trace_id)
d3e4dcd8 159{
7cdc2bab
MD
160 struct lttng_live_trace *trace;
161
162 bt_list_for_each_entry(trace, &session->traces, node) {
163 if (trace->id == trace_id) {
164 return trace;
165 }
166 }
d3eb6e8f
PP
167 return NULL;
168}
169
7cdc2bab
MD
170static
171void lttng_live_destroy_trace(struct bt_object *obj)
172{
173 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
174
175 PDBG("Destroy trace\n");
176 assert(bt_list_empty(&trace->streams));
177 bt_list_del(&trace->node);
178 lttng_live_metadata_fini(trace);
179 BT_PUT(trace->cc_prio_map);
180 g_free(trace);
181}
182
183static
184struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
185 uint64_t trace_id)
186{
187 struct lttng_live_trace *trace = NULL;
188
189 trace = g_new0(struct lttng_live_trace, 1);
190 if (!trace) {
191 goto error;
192 }
193 trace->session = session;
194 trace->id = trace_id;
195 BT_INIT_LIST_HEAD(&trace->streams);
196 trace->new_metadata_needed = true;
197 bt_list_add(&trace->node, &session->traces);
198 bt_object_init(&trace->obj, lttng_live_destroy_trace);
199 goto end;
200error:
201 g_free(trace);
202 trace = NULL;
203end:
204 return trace;
205}
206
207BT_HIDDEN
208struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
209 uint64_t trace_id)
210{
211 struct lttng_live_trace *trace;
212
213 trace = lttng_live_find_trace(session, trace_id);
214 if (trace) {
215 bt_get(trace);
216 return trace;
217 }
218 return lttng_live_create_trace(session, trace_id);
219}
220
221BT_HIDDEN
222void lttng_live_unref_trace(struct lttng_live_trace *trace)
223{
224 bt_put(trace);
225}
226
227static
228void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
229{
230 struct lttng_live_stream_iterator *stream, *s;
231
232 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
233 lttng_live_stream_iterator_destroy(stream);
234 }
235 lttng_live_metadata_fini(trace);
236}
237
238BT_HIDDEN
239int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
240{
241 int ret = 0;
242 struct lttng_live_session *s;
243
244 s = g_new0(struct lttng_live_session, 1);
245 if (!s) {
246 goto error;
247 }
248
249 s->id = session_id;
250 BT_INIT_LIST_HEAD(&s->traces);
251 s->lttng_live = lttng_live;
252 s->new_streams_needed = true;
253
254 PDBG("Reading from session %" PRIu64 "\n", s->id);
255 bt_list_add(&s->node, &lttng_live->sessions);
256 goto end;
257error:
258 PERR("Error adding session\n");
259 g_free(s);
260 ret = -1;
261end:
262 return ret;
263}
264
265static
266void lttng_live_destroy_session(struct lttng_live_session *session)
267{
268 struct lttng_live_trace *trace, *t;
269
270 PDBG("Destroy session\n");
271 if (session->id != -1ULL) {
272 if (lttng_live_detach_session(session)) {
273 /* Old relayd cannot detach sessions. */
274 PDBG("Unable to detach session %" PRIu64 "\n",
275 session->id);
276 }
277 session->id = -1ULL;
278 }
279 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
280 lttng_live_close_trace_streams(trace);
281 }
282 bt_list_del(&session->node);
283 g_free(session);
284}
285
286BT_HIDDEN
287void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
288{
289 struct lttng_live_stream_iterator_generic *s =
290 bt_private_notification_iterator_get_user_data(it);
291
292 switch (s->type) {
293 case LIVE_STREAM_TYPE_NO_STREAM:
294 {
295 /* Leave no_stream_iter in place when port is removed. */
296 break;
297 }
298 case LIVE_STREAM_TYPE_STREAM:
299 {
300 struct lttng_live_stream_iterator *stream_iter =
301 container_of(s, struct lttng_live_stream_iterator, p);
302
303 lttng_live_stream_iterator_destroy(stream_iter);
304 break;
305 }
306 }
307}
308
309static
310enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
311 struct lttng_live_component *lttng_live,
312 struct lttng_live_stream_iterator *lttng_live_stream)
313{
314 switch (lttng_live_stream->state) {
315 case LTTNG_LIVE_STREAM_QUIESCENT:
316 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
317 break;
318 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
319 /* Invalid state. */
320 PERR("Unexpected stream state \"ACTIVE_NO_DATA\"\n");
321 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
322 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
323 /* Invalid state. */
324 PERR("Unexpected stream state \"QUIESCENT_NO_DATA\"\n");
325 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
326 case LTTNG_LIVE_STREAM_EOF:
327 break;
328 }
329 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
330}
331
332/*
333 * For active no data stream, fetch next data. It can be either:
334 * - quiescent: need to put it in the prio heap at quiescent end
335 * timestamp,
336 * - have data: need to wire up first event into the prio heap,
337 * - have no data on this stream at this point: need to retry (AGAIN) or
338 * return EOF.
339 */
340static
341enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
342 struct lttng_live_component *lttng_live,
343 struct lttng_live_stream_iterator *lttng_live_stream)
344{
345 enum bt_ctf_lttng_live_iterator_status ret =
346 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
347 struct packet_index index;
348 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
349
350 if (lttng_live_stream->trace->new_metadata_needed) {
351 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
352 goto end;
353 }
354 if (lttng_live_stream->trace->session->new_streams_needed) {
355 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
356 goto end;
357 }
358 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
359 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
360 goto end;
361 }
362 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
363 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
364 goto end;
365 }
366 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
367 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
368 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
369 && lttng_live_stream->last_returned_inactivity_timestamp ==
370 lttng_live_stream->current_inactivity_timestamp) {
371 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
372 print_stream_state(lttng_live_stream);
373 } else {
374 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
375 }
376 goto end;
377 }
378 lttng_live_stream->base_offset = index.offset;
379 lttng_live_stream->offset = index.offset;
380 lttng_live_stream->len = index.packet_size / CHAR_BIT;
381end:
382 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
383 ret = lttng_live_iterator_next_check_stream_state(
384 lttng_live, lttng_live_stream);
385 }
386 return ret;
387}
388
389/*
390 * Creation of the notification requires the ctf trace to be created
391 * beforehand, but the live protocol gives us all streams (including
392 * metadata) at once. So we split it in three steps: getting streams,
393 * getting metadata (which creates the ctf trace), and then creating the
394 * per-stream notifications.
395 */
396static
397enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
398 struct lttng_live_component *lttng_live,
399 struct lttng_live_session *session)
400{
401 enum bt_ctf_lttng_live_iterator_status status;
402 struct lttng_live_trace *trace, *t;
403
404 if (lttng_live_attach_session(session)) {
405 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
406 }
407 status = lttng_live_get_new_streams(session);
408 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
409 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
410 return status;
411 }
412 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
413 status = lttng_live_metadata_update(trace);
414 if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
415 int retval;
416
417 retval = bt_ctf_trace_set_is_static(trace->trace);
418 assert(!retval);
419 } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
420 return status;
421 }
422 }
423 return lttng_live_lazy_notif_init(session);
424}
425
426BT_HIDDEN
427void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
428{
429 struct lttng_live_session *session;
430
431 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
432 session->new_streams_needed = true;
433 }
434}
435
436static
437void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
438{
439 struct lttng_live_session *session;
440
441 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
442 struct lttng_live_trace *trace;
443
444 session->new_streams_needed = true;
445 bt_list_for_each_entry(trace, &session->traces, node) {
446 trace->new_metadata_needed = true;
447 }
448 }
449}
450
451static
452enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
453 struct lttng_live_component *lttng_live)
454{
455 enum bt_ctf_lttng_live_iterator_status ret =
456 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
457 unsigned int nr_sessions_opened = 0;
458 struct lttng_live_session *session, *s;
459
460 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
461 if (session->closed && bt_list_empty(&session->traces)) {
462 lttng_live_destroy_session(session);
463 }
464 }
465 /*
466 * Currently, when there are no sessions, we quit immediately.
467 * We may want to add a component parameter to keep trying until
468 * we get data in the future.
469 * Also, in a remotely distant future, we could add a "new
470 * session" flag to the protocol, which would tell us that we
471 * need to query for new sessions even though we have sessions
472 * currently ongoing.
473 */
474 if (bt_list_empty(&lttng_live->sessions)) {
475 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
476 goto end;
477 }
478 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
479 ret = lttng_live_get_session(lttng_live, session);
480 switch (ret) {
481 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
482 break;
483 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
484 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
485 break;
486 default:
487 goto end;
488 }
489 if (!session->closed) {
490 nr_sessions_opened++;
491 }
492 }
493end:
494 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
495 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
496 }
497 return ret;
498}
499
500static
501enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
502 struct lttng_live_component *lttng_live,
503 struct lttng_live_stream_iterator *lttng_live_stream,
504 struct bt_notification **notification,
505 uint64_t timestamp)
506{
507 enum bt_ctf_lttng_live_iterator_status ret =
508 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
509 struct lttng_live_trace *trace;
510 struct bt_ctf_clock_class *clock_class = NULL;
511 struct bt_ctf_clock_value *clock_value = NULL;
512 struct bt_notification *notif = NULL;
513 int retval;
514
515 trace = lttng_live_stream->trace;
516 if (!trace) {
517 goto error;
518 }
519 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
520 if (!clock_class) {
521 goto error;
522 }
523 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
524 if (!clock_value) {
525 goto error;
526 }
527 notif = bt_notification_inactivity_create(trace->cc_prio_map);
528 if (!notif) {
529 goto error;
530 }
531 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
532 if (retval) {
533 goto error;
534 }
535 *notification = notif;
536end:
537 bt_put(clock_value);
538 bt_put(clock_class);
539 return ret;
540
541error:
542 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
543 bt_put(notif);
544 goto end;
545}
546
547static
548enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
549 struct lttng_live_component *lttng_live,
550 struct lttng_live_stream_iterator *lttng_live_stream,
551 struct bt_notification **notification)
552{
553 enum bt_ctf_lttng_live_iterator_status ret =
554 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
555 struct bt_ctf_clock_class *clock_class = NULL;
556 struct bt_ctf_clock_value *clock_value = NULL;
557
558 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
559 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
560 }
561
562 if (lttng_live_stream->current_inactivity_timestamp ==
563 lttng_live_stream->last_returned_inactivity_timestamp) {
564 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
565 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
566 goto end;
567 }
568
569 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
570 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
571
572 lttng_live_stream->last_returned_inactivity_timestamp =
573 lttng_live_stream->current_inactivity_timestamp;
574end:
575 bt_put(clock_value);
576 bt_put(clock_class);
577 return ret;
578}
579
580static
581enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
582 struct lttng_live_component *lttng_live,
583 struct lttng_live_stream_iterator *lttng_live_stream,
584 struct bt_notification **notification)
585{
586 enum bt_ctf_lttng_live_iterator_status ret =
587 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
588 enum bt_ctf_notif_iter_status status;
589 struct lttng_live_session *session;
590
591 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
592 struct lttng_live_trace *trace;
593
594 if (session->new_streams_needed) {
595 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
596 }
597 bt_list_for_each_entry(trace, &session->traces, node) {
598 if (trace->new_metadata_needed) {
599 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
600 }
601 }
602 }
603
604 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
605 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
606 }
607 if (lttng_live_stream->packet_end_notif_queue) {
608 *notification = lttng_live_stream->packet_end_notif_queue;
609 lttng_live_stream->packet_end_notif_queue = NULL;
610 status = BT_CTF_NOTIF_ITER_STATUS_OK;
611 } else {
612 status = bt_ctf_notif_iter_get_next_notification(
613 lttng_live_stream->notif_iter,
614 lttng_live_stream->trace->cc_prio_map,
615 notification);
616 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
617 /*
618 * Consider empty packets as inactivity.
619 */
620 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
621 lttng_live_stream->packet_end_notif_queue = *notification;
622 *notification = NULL;
623 return emit_inactivity_notification(lttng_live,
624 lttng_live_stream, notification,
625 lttng_live_stream->current_packet_end_timestamp);
626 }
627 }
628 }
629 switch (status) {
630 case BT_CTF_NOTIF_ITER_STATUS_EOF:
631 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
632 break;
633 case BT_CTF_NOTIF_ITER_STATUS_OK:
634 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
635 break;
636 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
637 /*
638 * Continue immediately (end of packet). The next
639 * get_index may return AGAIN to delay the following
640 * attempt.
641 */
642 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
643 break;
644 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
645 /* No argument provided by the user, so don't return INVAL. */
646 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
647 default:
648 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
649 break;
650 }
651 return ret;
652}
653
654/*
655 * helper function:
656 * handle_no_data_streams()
657 * retry:
658 * - for each ACTIVE_NO_DATA stream:
659 * - query relayd for stream data, or quiescence info.
660 * - if need metadata, get metadata, goto retry.
661 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
662 * - if quiescent, move to QUIESCENT streams
663 * - if fetched data, move to ACTIVE_DATA streams
664 * (at this point each stream either has data, or is quiescent)
665 *
666 *
667 * iterator_next:
668 * handle_new_streams_and_metadata()
669 * - query relayd for known streams, add them as ACTIVE_NO_DATA
670 * - query relayd for metadata
671 *
672 * call handle_active_no_data_streams()
673 *
674 * handle_quiescent_streams()
675 * - if at least one stream is ACTIVE_DATA:
676 * - peek stream event with lowest timestamp -> next_ts
677 * - for each quiescent stream
678 * - if next_ts >= quiescent end
679 * - set state to ACTIVE_NO_DATA
680 * - else
681 * - for each quiescent stream
682 * - set state to ACTIVE_NO_DATA
683 *
684 * call handle_active_no_data_streams()
685 *
686 * handle_active_data_streams()
687 * - if at least one stream is ACTIVE_DATA:
688 * - get stream event with lowest timestamp from heap
689 * - make that stream event the current notification.
690 * - move this stream heap position to its next event
691 * - if we need to fetch data from relayd, move
692 * stream to ACTIVE_NO_DATA.
693 * - return OK
694 * - return AGAIN
695 *
696 * end criterion: ctrl-c on client. If relayd exits or the session
697 * closes on the relay daemon side, we keep on waiting for streams.
698 * Eventually handle --end timestamp (also an end criterion).
699 *
700 * When disconnected from relayd: try to re-connect endlessly.
701 */
702static
703struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
704 struct bt_private_notification_iterator *iterator,
705 struct lttng_live_stream_iterator *stream_iter)
706{
707 enum bt_ctf_lttng_live_iterator_status status;
708 struct bt_notification_iterator_next_return next_return;
709 struct lttng_live_component *lttng_live;
710
711 lttng_live = stream_iter->trace->session->lttng_live;
712retry:
713 print_stream_state(stream_iter);
714 next_return.notification = NULL;
715 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
716 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
717 goto end;
718 }
719 status = lttng_live_iterator_next_handle_one_no_data_stream(
720 lttng_live, stream_iter);
721 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
722 goto end;
723 }
724 status = lttng_live_iterator_next_handle_one_quiescent_stream(
725 lttng_live, stream_iter, &next_return.notification);
726 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
727 assert(next_return.notification == NULL);
728 goto end;
729 }
730 if (next_return.notification) {
731 goto end;
732 }
733 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
734 stream_iter, &next_return.notification);
735 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
736 assert(next_return.notification == NULL);
737 }
738
739end:
740 switch (status) {
741 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
742 print_dbg("continue");
743 goto retry;
744 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
745 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
746 print_dbg("again");
747 break;
748 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
749 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
750 print_dbg("end");
751 break;
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
753 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
754 print_dbg("ok");
755 break;
756 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
757 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
758 break;
759 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
760 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
761 break;
762 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
763 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
764 break;
765 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
766 default: /* fall-through */
767 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
768 break;
769 }
770 return next_return;
771}
772
773static
774struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
775 struct bt_private_notification_iterator *iterator,
776 struct lttng_live_no_stream_iterator *no_stream_iter)
777{
778 enum bt_ctf_lttng_live_iterator_status status;
779 struct bt_notification_iterator_next_return next_return;
780 struct lttng_live_component *lttng_live;
781
782 lttng_live = no_stream_iter->lttng_live;
783retry:
784 lttng_live_force_new_streams_and_metadata(lttng_live);
785 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
786 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
787 goto end;
788 }
789 if (no_stream_iter->port) {
790 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
791 } else {
792 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
793 }
794end:
795 switch (status) {
796 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
797 goto retry;
798 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
799 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
800 break;
801 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
802 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
803 break;
804 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
805 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
806 break;
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
808 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
809 break;
810 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
811 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
812 break;
813 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
814 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
815 break;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
817 default: /* fall-through */
818 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
819 break;
820 }
821 return next_return;
822}
823
d3eb6e8f 824BT_HIDDEN
41a2b7ae 825struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 826 struct bt_private_notification_iterator *iterator)
d3eb6e8f 827{
7cdc2bab
MD
828 struct lttng_live_stream_iterator_generic *s =
829 bt_private_notification_iterator_get_user_data(iterator);
830 struct bt_notification_iterator_next_return next_return;
831
832 switch (s->type) {
833 case LIVE_STREAM_TYPE_NO_STREAM:
834 next_return = lttng_live_iterator_next_no_stream(iterator,
835 container_of(s, struct lttng_live_no_stream_iterator, p));
836 break;
837 case LIVE_STREAM_TYPE_STREAM:
838 next_return = lttng_live_iterator_next_stream(iterator,
839 container_of(s, struct lttng_live_stream_iterator, p));
840 break;
841 default:
842 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
843 break;
844 }
845 return next_return;
846}
41a2b7ae 847
7cdc2bab
MD
848BT_HIDDEN
849enum bt_notification_iterator_status lttng_live_iterator_init(
850 struct bt_private_notification_iterator *it,
851 struct bt_private_port *port)
852{
853 enum bt_notification_iterator_status ret =
854 BT_NOTIFICATION_ITERATOR_STATUS_OK;
855 struct lttng_live_stream_iterator_generic *s;
856 struct lttng_live_component *lttng_live;
857
858 assert(it);
859
860 s = bt_private_port_get_user_data(port);
861 assert(s);
862 switch (s->type) {
863 case LIVE_STREAM_TYPE_NO_STREAM:
864 {
865 struct lttng_live_no_stream_iterator *no_stream_iter =
866 container_of(s, struct lttng_live_no_stream_iterator, p);
867 lttng_live = no_stream_iter->lttng_live;
868 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
869 if (ret) {
870 goto error;
871 }
872 break;
873 }
874 case LIVE_STREAM_TYPE_STREAM:
875 {
876 struct lttng_live_stream_iterator *stream_iter =
877 container_of(s, struct lttng_live_stream_iterator, p);
878 lttng_live = stream_iter->trace->session->lttng_live;
879 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
880 if (ret) {
881 goto error;
882 }
883 break;
884 }
885 default:
886 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
887 goto end;
888 }
889
890end:
41a2b7ae 891 return ret;
7cdc2bab
MD
892error:
893 if (bt_private_notification_iterator_set_user_data(it, NULL)
894 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
895 PERR("Error setting private data to NULL\n");
896 }
897 goto end;
898}
899
900static
901struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
902 struct bt_value *params)
903{
904 struct bt_value *url_value = NULL;
905 struct bt_value *results = NULL;
906 const char *url;
907 struct bt_live_viewer_connection *viewer_connection = NULL;
908 enum bt_value_status ret;
909
910 url_value = bt_value_map_get(params, "url");
911 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
912 fprintf(stderr, "Mandatory \"url\" parameter missing\n");
913 goto error;
914 }
915
916 ret = bt_value_string_get(url_value, &url);
917 if (ret != BT_VALUE_STATUS_OK) {
918 fprintf(stderr, "\"url\" parameter is required to be a string value\n");
919 goto error;
920 }
921
922 viewer_connection = bt_live_viewer_connection_create(url, stderr);
923 if (!viewer_connection) {
924 ret = BT_COMPONENT_STATUS_NOMEM;
925 goto error;
926 }
927
928 results = bt_live_viewer_connection_list_sessions(viewer_connection);
929 goto end;
930error:
931 BT_PUT(results);
932end:
933 if (viewer_connection) {
934 bt_live_viewer_connection_destroy(viewer_connection);
935 }
936 BT_PUT(url_value);
937 return results;
938}
939
940BT_HIDDEN
941struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
942 const char *object, struct bt_value *params)
943{
944 if (strcmp(object, "sessions") == 0) {
945 return lttng_live_query_list_sessions(comp_class,
946 params);
947 }
948 fprintf(stderr, "Unknown query object `%s`\n", object);
949 return NULL;
950}
951
952static
953void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
954{
955 int ret;
956 struct lttng_live_session *session, *s;
957
958 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
959 lttng_live_destroy_session(session);
960 }
961 BT_PUT(lttng_live->viewer_connection);
962 if (lttng_live->url) {
963 g_string_free(lttng_live->url, TRUE);
964 }
965 if (lttng_live->no_stream_port) {
966 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
967 assert(!ret);
968 BT_PUT(lttng_live->no_stream_port);
969 }
970 if (lttng_live->no_stream_iter) {
971 g_free(lttng_live->no_stream_iter);
972 }
973 g_free(lttng_live);
974}
975
976BT_HIDDEN
977void lttng_live_component_finalize(struct bt_private_component *component)
978{
979 void *data = bt_private_component_get_user_data(component);
980
981 if (!data) {
982 return;
983 }
984 lttng_live_component_destroy_data(data);
985}
986
987static
988struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
989 struct bt_private_component *private_component)
990{
991 struct lttng_live_component *lttng_live;
992 struct bt_value *value = NULL;
993 const char *url;
994 enum bt_value_status ret;
995
996 lttng_live = g_new0(struct lttng_live_component, 1);
997 if (!lttng_live) {
998 goto end;
999 }
1000 lttng_live->error_fp = stderr;
1001 /* TODO: make this an overridable parameter. */
1002 lttng_live->max_query_size = MAX_QUERY_SIZE;
1003 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1004 value = bt_value_map_get(params, "url");
1005 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
1006 fprintf(stderr, "Mandatory \"url\" parameter missing\n");
1007 goto error;
1008 }
1009 ret = bt_value_string_get(value, &url);
1010 if (ret != BT_VALUE_STATUS_OK) {
1011 fprintf(stderr, "\"url\" parameter is required to be a string value\n");
1012 goto error;
1013 }
1014 lttng_live->url = g_string_new(url);
1015 if (!lttng_live->url) {
1016 goto error;
1017 }
1018 lttng_live->viewer_connection =
1019 bt_live_viewer_connection_create(lttng_live->url->str,
1020 stderr);
1021 if (!lttng_live->viewer_connection) {
1022 ret = BT_COMPONENT_STATUS_NOMEM;
1023 goto error;
1024 }
1025 if (lttng_live_create_viewer_session(lttng_live)) {
1026 ret = BT_COMPONENT_STATUS_ERROR;
1027 goto error;
1028 }
1029 lttng_live->private_component = private_component;
1030
1031 goto end;
1032
1033error:
1034 lttng_live_component_destroy_data(lttng_live);
1035 lttng_live = NULL;
1036end:
1037 return lttng_live;
d3e4dcd8
PP
1038}
1039
f3bc2010 1040BT_HIDDEN
7cdc2bab
MD
1041enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
1042 struct bt_value *params, void *init_method_data)
f3bc2010 1043{
7cdc2bab
MD
1044 struct lttng_live_component *lttng_live;
1045 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1046
1047 lttng_live_debug = g_strcmp0(getenv("LTTNG_LIVE_DEBUG"), "1") == 0;
1048
1049 /* Passes ownership of iter ref to lttng_live_component_create. */
1050 lttng_live = lttng_live_component_create(params, component);
1051 if (!lttng_live) {
1052 ret = BT_COMPONENT_STATUS_NOMEM;
1053 goto end;
1054 }
1055
1056 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1057 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1058 lttng_live->no_stream_iter->lttng_live = lttng_live;
1059
1060 lttng_live->no_stream_port =
1061 bt_private_component_source_add_output_private_port(
1062 lttng_live->private_component, "no-stream",
1063 lttng_live->no_stream_iter);
1064 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1065
1066 ret = bt_private_component_set_user_data(component, lttng_live);
1067 if (ret != BT_COMPONENT_STATUS_OK) {
1068 goto error;
1069 }
1070
1071end:
1072 return ret;
1073error:
1074 (void) bt_private_component_set_user_data(component, NULL);
1075 lttng_live_component_destroy_data(lttng_live);
1076 return ret;
f3bc2010 1077}
This page took 0.068858 seconds and 4 git commands to generate.