Make bt_graph_connect_ports() return a status code
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab
MD
47#include <babeltrace/compiler-internal.h>
48#include <inttypes.h>
49#include <glib.h>
50#include <assert.h>
51#include <unistd.h>
7d61fa8e 52#include <plugins-common.h>
f3bc2010 53
7cdc2bab
MD
54#include "data-stream.h"
55#include "metadata.h"
0f5e83e5 56#include "lttng-live-internal.h"
7cdc2bab 57
7cdc2bab 58#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 59
087bc060 60#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
61
62static const char *print_state(struct lttng_live_stream_iterator *s)
63{
64 switch (s->state) {
65 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
66 return "ACTIVE_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
68 return "QUIESCENT_NO_DATA";
69 case LTTNG_LIVE_STREAM_QUIESCENT:
70 return "QUIESCENT";
71 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
72 return "ACTIVE_DATA";
73 case LTTNG_LIVE_STREAM_EOF:
74 return "EOF";
75 default:
76 return "ERROR";
77 }
78}
7cdc2bab
MD
79
80#define print_stream_state(stream) \
81 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
82 bt_port_get_name(bt_port_from_private_port(stream->port)), \
83 print_state(stream), stream->last_returned_inactivity_timestamp, \
84 stream->current_inactivity_timestamp)
85
7cdc2bab
MD
86BT_HIDDEN
87int lttng_live_add_port(struct lttng_live_component *lttng_live,
88 struct lttng_live_stream_iterator *stream_iter)
89{
90 int ret;
91 struct bt_private_port *private_port;
92 char name[STREAM_NAME_MAX_LEN];
93
94 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
95 assert(ret > 0);
96 strcpy(stream_iter->name, name);
97 private_port = bt_private_component_source_add_output_private_port(
98 lttng_live->private_component, name, stream_iter);
99 if (!private_port) {
100 return -1;
101 }
087bc060 102 BT_LOGI("Added port %s", name);
7cdc2bab
MD
103
104 if (lttng_live->no_stream_port) {
105 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
106 if (ret) {
107 return -1;
108 }
109 BT_PUT(lttng_live->no_stream_port);
110 lttng_live->no_stream_iter->port = NULL;
111 }
112 stream_iter->port = private_port;
113 return 0;
114}
115
116BT_HIDDEN
117int lttng_live_remove_port(struct lttng_live_component *lttng_live,
118 struct bt_private_port *port)
119{
120 struct bt_component *component;
121 int64_t nr_ports;
122 int ret;
123
124 component = bt_component_from_private_component(lttng_live->private_component);
125 nr_ports = bt_component_source_get_output_port_count(component);
126 if (nr_ports < 0) {
127 return -1;
128 }
129 BT_PUT(component);
130 if (nr_ports == 1) {
131 assert(!lttng_live->no_stream_port);
132 lttng_live->no_stream_port =
133 bt_private_component_source_add_output_private_port(lttng_live->private_component,
134 "no-stream", lttng_live->no_stream_iter);
135 if (!lttng_live->no_stream_port) {
136 return -1;
137 }
138 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
139 }
140 ret = bt_private_port_remove_from_component(port);
141 if (ret) {
142 return -1;
143 }
144 return 0;
145}
146
147static
148struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
149 uint64_t trace_id)
d3e4dcd8 150{
7cdc2bab
MD
151 struct lttng_live_trace *trace;
152
153 bt_list_for_each_entry(trace, &session->traces, node) {
154 if (trace->id == trace_id) {
155 return trace;
156 }
157 }
d3eb6e8f
PP
158 return NULL;
159}
160
7cdc2bab
MD
161static
162void lttng_live_destroy_trace(struct bt_object *obj)
163{
164 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 165 int retval;
7cdc2bab 166
087bc060 167 BT_LOGI("Destroy trace");
7cdc2bab
MD
168 assert(bt_list_empty(&trace->streams));
169 bt_list_del(&trace->node);
5bd230f4
MD
170
171 retval = bt_ctf_trace_set_is_static(trace->trace);
172 assert(!retval);
173
7cdc2bab
MD
174 lttng_live_metadata_fini(trace);
175 BT_PUT(trace->cc_prio_map);
176 g_free(trace);
177}
178
179static
180struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
181 uint64_t trace_id)
182{
183 struct lttng_live_trace *trace = NULL;
184
185 trace = g_new0(struct lttng_live_trace, 1);
186 if (!trace) {
187 goto error;
188 }
189 trace->session = session;
190 trace->id = trace_id;
191 BT_INIT_LIST_HEAD(&trace->streams);
192 trace->new_metadata_needed = true;
193 bt_list_add(&trace->node, &session->traces);
194 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 195 BT_LOGI("Create trace");
7cdc2bab
MD
196 goto end;
197error:
198 g_free(trace);
199 trace = NULL;
200end:
201 return trace;
202}
203
204BT_HIDDEN
205struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
206 uint64_t trace_id)
207{
208 struct lttng_live_trace *trace;
209
210 trace = lttng_live_find_trace(session, trace_id);
211 if (trace) {
212 bt_get(trace);
213 return trace;
214 }
215 return lttng_live_create_trace(session, trace_id);
216}
217
218BT_HIDDEN
219void lttng_live_unref_trace(struct lttng_live_trace *trace)
220{
221 bt_put(trace);
222}
223
224static
225void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
226{
227 struct lttng_live_stream_iterator *stream, *s;
228
229 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
230 lttng_live_stream_iterator_destroy(stream);
231 }
232 lttng_live_metadata_fini(trace);
233}
234
235BT_HIDDEN
06994c71
MD
236int lttng_live_add_session(struct lttng_live_component *lttng_live,
237 uint64_t session_id, const char *hostname,
238 const char *session_name)
7cdc2bab
MD
239{
240 int ret = 0;
241 struct lttng_live_session *s;
242
243 s = g_new0(struct lttng_live_session, 1);
244 if (!s) {
245 goto error;
246 }
247
248 s->id = session_id;
249 BT_INIT_LIST_HEAD(&s->traces);
250 s->lttng_live = lttng_live;
251 s->new_streams_needed = true;
06994c71
MD
252 s->hostname = g_string_new(hostname);
253 s->session_name = g_string_new(session_name);
7cdc2bab 254
06994c71
MD
255 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
256 s->id, hostname, session_name);
7cdc2bab
MD
257 bt_list_add(&s->node, &lttng_live->sessions);
258 goto end;
259error:
087bc060 260 BT_LOGE("Error adding session");
7cdc2bab
MD
261 g_free(s);
262 ret = -1;
263end:
264 return ret;
265}
266
267static
268void lttng_live_destroy_session(struct lttng_live_session *session)
269{
270 struct lttng_live_trace *trace, *t;
271
087bc060 272 BT_LOGI("Destroy session");
7cdc2bab
MD
273 if (session->id != -1ULL) {
274 if (lttng_live_detach_session(session)) {
4c66436f
MD
275 if (!bt_graph_is_canceled(session->lttng_live->graph)) {
276 /* Old relayd cannot detach sessions. */
277 BT_LOGD("Unable to detach session %" PRIu64,
278 session->id);
279 }
7cdc2bab
MD
280 }
281 session->id = -1ULL;
282 }
283 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
284 lttng_live_close_trace_streams(trace);
285 }
286 bt_list_del(&session->node);
06994c71
MD
287 if (session->hostname) {
288 g_string_free(session->hostname, TRUE);
289 }
290 if (session->session_name) {
291 g_string_free(session->session_name, TRUE);
292 }
7cdc2bab
MD
293 g_free(session);
294}
295
296BT_HIDDEN
297void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
298{
299 struct lttng_live_stream_iterator_generic *s =
300 bt_private_notification_iterator_get_user_data(it);
301
302 switch (s->type) {
303 case LIVE_STREAM_TYPE_NO_STREAM:
304 {
305 /* Leave no_stream_iter in place when port is removed. */
306 break;
307 }
308 case LIVE_STREAM_TYPE_STREAM:
309 {
310 struct lttng_live_stream_iterator *stream_iter =
311 container_of(s, struct lttng_live_stream_iterator, p);
312
313 lttng_live_stream_iterator_destroy(stream_iter);
314 break;
315 }
316 }
317}
318
319static
320enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
321 struct lttng_live_component *lttng_live,
322 struct lttng_live_stream_iterator *lttng_live_stream)
323{
324 switch (lttng_live_stream->state) {
325 case LTTNG_LIVE_STREAM_QUIESCENT:
326 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
327 break;
328 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
329 /* Invalid state. */
087bc060
MD
330 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
331 abort();
7cdc2bab
MD
332 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
333 /* Invalid state. */
087bc060
MD
334 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
335 abort();
7cdc2bab
MD
336 case LTTNG_LIVE_STREAM_EOF:
337 break;
338 }
339 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
340}
341
342/*
343 * For active no data stream, fetch next data. It can be either:
344 * - quiescent: need to put it in the prio heap at quiescent end
345 * timestamp,
346 * - have data: need to wire up first event into the prio heap,
347 * - have no data on this stream at this point: need to retry (AGAIN) or
348 * return EOF.
349 */
350static
351enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
352 struct lttng_live_component *lttng_live,
353 struct lttng_live_stream_iterator *lttng_live_stream)
354{
355 enum bt_ctf_lttng_live_iterator_status ret =
356 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
357 struct packet_index index;
358 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
359
360 if (lttng_live_stream->trace->new_metadata_needed) {
361 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
362 goto end;
363 }
364 if (lttng_live_stream->trace->session->new_streams_needed) {
365 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
366 goto end;
367 }
368 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
369 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
370 goto end;
371 }
372 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
373 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
374 goto end;
375 }
376 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
377 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
378 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
379 && lttng_live_stream->last_returned_inactivity_timestamp ==
380 lttng_live_stream->current_inactivity_timestamp) {
381 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
382 print_stream_state(lttng_live_stream);
383 } else {
384 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
385 }
386 goto end;
387 }
388 lttng_live_stream->base_offset = index.offset;
389 lttng_live_stream->offset = index.offset;
390 lttng_live_stream->len = index.packet_size / CHAR_BIT;
391end:
392 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
393 ret = lttng_live_iterator_next_check_stream_state(
394 lttng_live, lttng_live_stream);
395 }
396 return ret;
397}
398
399/*
400 * Creation of the notification requires the ctf trace to be created
401 * beforehand, but the live protocol gives us all streams (including
402 * metadata) at once. So we split it in three steps: getting streams,
403 * getting metadata (which creates the ctf trace), and then creating the
404 * per-stream notifications.
405 */
406static
407enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
408 struct lttng_live_component *lttng_live,
409 struct lttng_live_session *session)
410{
411 enum bt_ctf_lttng_live_iterator_status status;
412 struct lttng_live_trace *trace, *t;
413
414 if (lttng_live_attach_session(session)) {
4c66436f
MD
415 if (bt_graph_is_canceled(lttng_live->graph)) {
416 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
417 } else {
418 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
419 }
7cdc2bab
MD
420 }
421 status = lttng_live_get_new_streams(session);
422 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
423 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
424 return status;
425 }
426 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
427 status = lttng_live_metadata_update(trace);
5bd230f4
MD
428 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
429 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
430 return status;
431 }
432 }
433 return lttng_live_lazy_notif_init(session);
434}
435
436BT_HIDDEN
437void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
438{
439 struct lttng_live_session *session;
440
441 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
442 session->new_streams_needed = true;
443 }
444}
445
446static
447void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
448{
449 struct lttng_live_session *session;
450
451 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
452 struct lttng_live_trace *trace;
453
454 session->new_streams_needed = true;
455 bt_list_for_each_entry(trace, &session->traces, node) {
456 trace->new_metadata_needed = true;
457 }
458 }
459}
460
461static
3cdf4234 462enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
463 struct lttng_live_component *lttng_live)
464{
465 enum bt_ctf_lttng_live_iterator_status ret =
466 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
467 unsigned int nr_sessions_opened = 0;
468 struct lttng_live_session *session, *s;
469
470 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
471 if (session->closed && bt_list_empty(&session->traces)) {
472 lttng_live_destroy_session(session);
473 }
474 }
475 /*
476 * Currently, when there are no sessions, we quit immediately.
477 * We may want to add a component parameter to keep trying until
478 * we get data in the future.
479 * Also, in a remotely distant future, we could add a "new
480 * session" flag to the protocol, which would tell us that we
481 * need to query for new sessions even though we have sessions
482 * currently ongoing.
483 */
484 if (bt_list_empty(&lttng_live->sessions)) {
485 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
486 goto end;
487 }
488 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
489 ret = lttng_live_get_session(lttng_live, session);
490 switch (ret) {
491 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
492 break;
493 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
494 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
495 break;
496 default:
497 goto end;
498 }
499 if (!session->closed) {
500 nr_sessions_opened++;
501 }
502 }
503end:
504 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
505 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
506 }
507 return ret;
508}
509
510static
511enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
512 struct lttng_live_component *lttng_live,
513 struct lttng_live_stream_iterator *lttng_live_stream,
514 struct bt_notification **notification,
515 uint64_t timestamp)
516{
517 enum bt_ctf_lttng_live_iterator_status ret =
518 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
519 struct lttng_live_trace *trace;
520 struct bt_ctf_clock_class *clock_class = NULL;
521 struct bt_ctf_clock_value *clock_value = NULL;
522 struct bt_notification *notif = NULL;
523 int retval;
524
525 trace = lttng_live_stream->trace;
526 if (!trace) {
527 goto error;
528 }
529 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
530 if (!clock_class) {
531 goto error;
532 }
533 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
534 if (!clock_value) {
535 goto error;
536 }
537 notif = bt_notification_inactivity_create(trace->cc_prio_map);
538 if (!notif) {
539 goto error;
540 }
541 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
542 if (retval) {
543 goto error;
544 }
545 *notification = notif;
546end:
547 bt_put(clock_value);
548 bt_put(clock_class);
549 return ret;
550
551error:
552 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
553 bt_put(notif);
554 goto end;
555}
556
557static
558enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
559 struct lttng_live_component *lttng_live,
560 struct lttng_live_stream_iterator *lttng_live_stream,
561 struct bt_notification **notification)
562{
563 enum bt_ctf_lttng_live_iterator_status ret =
564 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
565 struct bt_ctf_clock_class *clock_class = NULL;
566 struct bt_ctf_clock_value *clock_value = NULL;
567
568 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
569 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
570 }
571
572 if (lttng_live_stream->current_inactivity_timestamp ==
573 lttng_live_stream->last_returned_inactivity_timestamp) {
574 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
575 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
576 goto end;
577 }
578
579 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
580 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
581
582 lttng_live_stream->last_returned_inactivity_timestamp =
583 lttng_live_stream->current_inactivity_timestamp;
584end:
585 bt_put(clock_value);
586 bt_put(clock_class);
587 return ret;
588}
589
590static
591enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
592 struct lttng_live_component *lttng_live,
593 struct lttng_live_stream_iterator *lttng_live_stream,
594 struct bt_notification **notification)
595{
596 enum bt_ctf_lttng_live_iterator_status ret =
597 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
598 enum bt_ctf_notif_iter_status status;
599 struct lttng_live_session *session;
600
601 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
602 struct lttng_live_trace *trace;
603
604 if (session->new_streams_needed) {
605 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
606 }
607 bt_list_for_each_entry(trace, &session->traces, node) {
608 if (trace->new_metadata_needed) {
609 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
610 }
611 }
612 }
613
614 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
615 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
616 }
617 if (lttng_live_stream->packet_end_notif_queue) {
618 *notification = lttng_live_stream->packet_end_notif_queue;
619 lttng_live_stream->packet_end_notif_queue = NULL;
620 status = BT_CTF_NOTIF_ITER_STATUS_OK;
621 } else {
622 status = bt_ctf_notif_iter_get_next_notification(
623 lttng_live_stream->notif_iter,
624 lttng_live_stream->trace->cc_prio_map,
625 notification);
626 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
627 /*
628 * Consider empty packets as inactivity.
629 */
630 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
631 lttng_live_stream->packet_end_notif_queue = *notification;
632 *notification = NULL;
633 return emit_inactivity_notification(lttng_live,
634 lttng_live_stream, notification,
635 lttng_live_stream->current_packet_end_timestamp);
636 }
637 }
638 }
639 switch (status) {
640 case BT_CTF_NOTIF_ITER_STATUS_EOF:
641 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
642 break;
643 case BT_CTF_NOTIF_ITER_STATUS_OK:
644 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
645 break;
646 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
647 /*
648 * Continue immediately (end of packet). The next
649 * get_index may return AGAIN to delay the following
650 * attempt.
651 */
652 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
653 break;
654 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
655 /* No argument provided by the user, so don't return INVAL. */
656 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
657 default:
658 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
659 break;
660 }
661 return ret;
662}
663
664/*
665 * helper function:
666 * handle_no_data_streams()
667 * retry:
668 * - for each ACTIVE_NO_DATA stream:
669 * - query relayd for stream data, or quiescence info.
670 * - if need metadata, get metadata, goto retry.
671 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
672 * - if quiescent, move to QUIESCENT streams
673 * - if fetched data, move to ACTIVE_DATA streams
674 * (at this point each stream either has data, or is quiescent)
675 *
676 *
677 * iterator_next:
678 * handle_new_streams_and_metadata()
679 * - query relayd for known streams, add them as ACTIVE_NO_DATA
680 * - query relayd for metadata
681 *
682 * call handle_active_no_data_streams()
683 *
684 * handle_quiescent_streams()
685 * - if at least one stream is ACTIVE_DATA:
686 * - peek stream event with lowest timestamp -> next_ts
687 * - for each quiescent stream
688 * - if next_ts >= quiescent end
689 * - set state to ACTIVE_NO_DATA
690 * - else
691 * - for each quiescent stream
692 * - set state to ACTIVE_NO_DATA
693 *
694 * call handle_active_no_data_streams()
695 *
696 * handle_active_data_streams()
697 * - if at least one stream is ACTIVE_DATA:
698 * - get stream event with lowest timestamp from heap
699 * - make that stream event the current notification.
700 * - move this stream heap position to its next event
701 * - if we need to fetch data from relayd, move
702 * stream to ACTIVE_NO_DATA.
703 * - return OK
704 * - return AGAIN
705 *
706 * end criterion: ctrl-c on client. If relayd exits or the session
707 * closes on the relay daemon side, we keep on waiting for streams.
708 * Eventually handle --end timestamp (also an end criterion).
709 *
710 * When disconnected from relayd: try to re-connect endlessly.
711 */
712static
713struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
714 struct bt_private_notification_iterator *iterator,
715 struct lttng_live_stream_iterator *stream_iter)
716{
717 enum bt_ctf_lttng_live_iterator_status status;
718 struct bt_notification_iterator_next_return next_return;
719 struct lttng_live_component *lttng_live;
720
721 lttng_live = stream_iter->trace->session->lttng_live;
722retry:
723 print_stream_state(stream_iter);
724 next_return.notification = NULL;
725 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
726 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
727 goto end;
728 }
729 status = lttng_live_iterator_next_handle_one_no_data_stream(
730 lttng_live, stream_iter);
731 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
732 goto end;
733 }
734 status = lttng_live_iterator_next_handle_one_quiescent_stream(
735 lttng_live, stream_iter, &next_return.notification);
736 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
737 assert(next_return.notification == NULL);
738 goto end;
739 }
740 if (next_return.notification) {
741 goto end;
742 }
743 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
744 stream_iter, &next_return.notification);
745 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
746 assert(next_return.notification == NULL);
747 }
748
749end:
750 switch (status) {
751 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
752 print_dbg("continue");
753 goto retry;
754 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
755 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
756 print_dbg("again");
757 break;
758 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
759 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
760 print_dbg("end");
761 break;
762 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
763 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
764 print_dbg("ok");
765 break;
766 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
767 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
768 break;
769 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
770 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
771 break;
772 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
773 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
774 break;
775 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
776 default: /* fall-through */
777 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
778 break;
779 }
780 return next_return;
781}
782
783static
784struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
785 struct bt_private_notification_iterator *iterator,
786 struct lttng_live_no_stream_iterator *no_stream_iter)
787{
788 enum bt_ctf_lttng_live_iterator_status status;
789 struct bt_notification_iterator_next_return next_return;
790 struct lttng_live_component *lttng_live;
791
792 lttng_live = no_stream_iter->lttng_live;
793retry:
794 lttng_live_force_new_streams_and_metadata(lttng_live);
795 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
796 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
797 goto end;
798 }
799 if (no_stream_iter->port) {
800 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
801 } else {
802 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
803 }
804end:
805 switch (status) {
806 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
807 goto retry;
808 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
809 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
810 break;
811 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
812 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
813 break;
814 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
815 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
816 break;
817 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
818 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
819 break;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
821 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
822 break;
823 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
824 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
825 break;
826 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
827 default: /* fall-through */
828 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
829 break;
830 }
831 return next_return;
832}
833
d3eb6e8f 834BT_HIDDEN
41a2b7ae 835struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 836 struct bt_private_notification_iterator *iterator)
d3eb6e8f 837{
7cdc2bab
MD
838 struct lttng_live_stream_iterator_generic *s =
839 bt_private_notification_iterator_get_user_data(iterator);
840 struct bt_notification_iterator_next_return next_return;
841
842 switch (s->type) {
843 case LIVE_STREAM_TYPE_NO_STREAM:
844 next_return = lttng_live_iterator_next_no_stream(iterator,
845 container_of(s, struct lttng_live_no_stream_iterator, p));
846 break;
847 case LIVE_STREAM_TYPE_STREAM:
848 next_return = lttng_live_iterator_next_stream(iterator,
849 container_of(s, struct lttng_live_stream_iterator, p));
850 break;
851 default:
852 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
853 break;
854 }
855 return next_return;
856}
41a2b7ae 857
7cdc2bab
MD
858BT_HIDDEN
859enum bt_notification_iterator_status lttng_live_iterator_init(
860 struct bt_private_notification_iterator *it,
861 struct bt_private_port *port)
862{
863 enum bt_notification_iterator_status ret =
864 BT_NOTIFICATION_ITERATOR_STATUS_OK;
865 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
866
867 assert(it);
868
869 s = bt_private_port_get_user_data(port);
870 assert(s);
871 switch (s->type) {
872 case LIVE_STREAM_TYPE_NO_STREAM:
873 {
874 struct lttng_live_no_stream_iterator *no_stream_iter =
875 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
876 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
877 if (ret) {
878 goto error;
879 }
880 break;
881 }
882 case LIVE_STREAM_TYPE_STREAM:
883 {
884 struct lttng_live_stream_iterator *stream_iter =
885 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
886 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
887 if (ret) {
888 goto error;
889 }
890 break;
891 }
892 default:
893 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
894 goto end;
895 }
896
897end:
41a2b7ae 898 return ret;
7cdc2bab
MD
899error:
900 if (bt_private_notification_iterator_set_user_data(it, NULL)
901 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 902 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
903 }
904 goto end;
905}
906
907static
908struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
909 struct bt_value *params)
910{
911 struct bt_value *url_value = NULL;
912 struct bt_value *results = NULL;
913 const char *url;
914 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
915
916 url_value = bt_value_map_get(params, "url");
917 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 918 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
919 goto error;
920 }
921
3cdf4234 922 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 923 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
924 goto error;
925 }
926
4c66436f 927 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 928 if (!viewer_connection) {
7cdc2bab
MD
929 goto error;
930 }
931
932 results = bt_live_viewer_connection_list_sessions(viewer_connection);
933 goto end;
934error:
935 BT_PUT(results);
936end:
937 if (viewer_connection) {
938 bt_live_viewer_connection_destroy(viewer_connection);
939 }
940 BT_PUT(url_value);
941 return results;
942}
943
944BT_HIDDEN
945struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
946 const char *object, struct bt_value *params)
947{
948 if (strcmp(object, "sessions") == 0) {
949 return lttng_live_query_list_sessions(comp_class,
950 params);
951 }
087bc060 952 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
953 return NULL;
954}
955
956static
957void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
958{
959 int ret;
960 struct lttng_live_session *session, *s;
961
962 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
963 lttng_live_destroy_session(session);
964 }
965 BT_PUT(lttng_live->viewer_connection);
966 if (lttng_live->url) {
967 g_string_free(lttng_live->url, TRUE);
968 }
969 if (lttng_live->no_stream_port) {
970 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
971 assert(!ret);
972 BT_PUT(lttng_live->no_stream_port);
973 }
974 if (lttng_live->no_stream_iter) {
975 g_free(lttng_live->no_stream_iter);
976 }
977 g_free(lttng_live);
978}
979
980BT_HIDDEN
981void lttng_live_component_finalize(struct bt_private_component *component)
982{
983 void *data = bt_private_component_get_user_data(component);
984
985 if (!data) {
986 return;
987 }
988 lttng_live_component_destroy_data(data);
989}
990
991static
992struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
3cdf4234
MD
993 struct bt_private_component *private_component,
994 struct bt_graph *graph)
7cdc2bab
MD
995{
996 struct lttng_live_component *lttng_live;
997 struct bt_value *value = NULL;
998 const char *url;
999 enum bt_value_status ret;
1000
1001 lttng_live = g_new0(struct lttng_live_component, 1);
1002 if (!lttng_live) {
1003 goto end;
1004 }
7cdc2bab
MD
1005 /* TODO: make this an overridable parameter. */
1006 lttng_live->max_query_size = MAX_QUERY_SIZE;
1007 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1008 value = bt_value_map_get(params, "url");
1009 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1010 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1011 goto error;
1012 }
1013 ret = bt_value_string_get(value, &url);
1014 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1015 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1016 goto error;
1017 }
1018 lttng_live->url = g_string_new(url);
1019 if (!lttng_live->url) {
1020 goto error;
1021 }
1022 lttng_live->viewer_connection =
4c66436f 1023 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1024 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1025 goto error;
1026 }
1027 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1028 goto error;
1029 }
1030 lttng_live->private_component = private_component;
3cdf4234 1031 lttng_live->graph = graph;
4c66436f 1032
7cdc2bab
MD
1033 goto end;
1034
1035error:
1036 lttng_live_component_destroy_data(lttng_live);
1037 lttng_live = NULL;
1038end:
1039 return lttng_live;
d3e4dcd8
PP
1040}
1041
f3bc2010 1042BT_HIDDEN
3cdf4234
MD
1043enum bt_component_status lttng_live_component_init(
1044 struct bt_private_component *private_component,
7cdc2bab 1045 struct bt_value *params, void *init_method_data)
f3bc2010 1046{
7cdc2bab
MD
1047 struct lttng_live_component *lttng_live;
1048 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
3cdf4234
MD
1049 struct bt_component *component;
1050 struct bt_graph *graph;
1051
1052 component = bt_component_from_private_component(private_component);
1053 graph = bt_component_get_graph(component);
1054 bt_put(graph); /* weak */
1055 bt_put(component);
7cdc2bab 1056
7cdc2bab 1057 /* Passes ownership of iter ref to lttng_live_component_create. */
3cdf4234
MD
1058 lttng_live = lttng_live_component_create(params, private_component,
1059 graph);
7cdc2bab 1060 if (!lttng_live) {
3cdf4234
MD
1061 if (bt_graph_is_canceled(graph)) {
1062 ret = BT_COMPONENT_STATUS_AGAIN;
1063 } else {
1064 ret = BT_COMPONENT_STATUS_NOMEM;
1065 }
7cdc2bab
MD
1066 goto end;
1067 }
1068
1069 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1070 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1071 lttng_live->no_stream_iter->lttng_live = lttng_live;
1072
1073 lttng_live->no_stream_port =
1074 bt_private_component_source_add_output_private_port(
1075 lttng_live->private_component, "no-stream",
1076 lttng_live->no_stream_iter);
1077 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1078
3cdf4234 1079 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1080 if (ret != BT_COMPONENT_STATUS_OK) {
1081 goto error;
1082 }
1083
1084end:
1085 return ret;
1086error:
3cdf4234 1087 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1088 lttng_live_component_destroy_data(lttng_live);
1089 return ret;
f3bc2010 1090}
087bc060 1091
d85ef162
MD
1092BT_HIDDEN
1093enum bt_component_status lttng_live_accept_port_connection(
1094 struct bt_private_component *private_component,
1095 struct bt_private_port *self_private_port,
1096 struct bt_port *other_port)
1097{
1098 struct lttng_live_component *lttng_live =
1099 bt_private_component_get_user_data(private_component);
1100 struct bt_component *other_component;
1101 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1102 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1103
1104 other_component = bt_port_get_component(other_port);
1105 bt_put(other_component); /* weak */
1106
1107 if (!lttng_live->downstream_component) {
1108 lttng_live->downstream_component = other_component;
1109 goto end;
1110 }
1111
1112 /*
1113 * Compare prior component to ensure we are connected to the
1114 * same downstream component as prior ports.
1115 */
1116 if (lttng_live->downstream_component != other_component) {
1117 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1118 bt_port_get_name(self_port),
1119 bt_component_get_name(other_component),
1120 bt_component_get_name(lttng_live->downstream_component));
1121 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1122 goto end;
1123 }
1124end:
1125 bt_put(self_port);
1126 return status;
1127}
This page took 0.074955 seconds and 4 git commands to generate.