End connection on destruction
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
7cdc2bab 33#include <babeltrace/ctf-ir/packet.h>
b2e0c907 34#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
35#include <babeltrace/graph/private-port.h>
36#include <babeltrace/graph/port.h>
37#include <babeltrace/graph/private-component.h>
38#include <babeltrace/graph/private-component-source.h>
39#include <babeltrace/graph/private-notification-iterator.h>
40#include <babeltrace/graph/notification-stream.h>
41#include <babeltrace/graph/notification-packet.h>
42#include <babeltrace/graph/notification-event.h>
43#include <babeltrace/graph/notification-heap.h>
44#include <babeltrace/graph/notification-iterator.h>
45#include <babeltrace/graph/notification-inactivity.h>
4c66436f 46#include <babeltrace/graph/graph.h>
7cdc2bab
MD
47#include <babeltrace/compiler-internal.h>
48#include <inttypes.h>
49#include <glib.h>
50#include <assert.h>
51#include <unistd.h>
7d61fa8e 52#include <plugins-common.h>
f3bc2010 53
7cdc2bab
MD
54#include "data-stream.h"
55#include "metadata.h"
0f5e83e5 56#include "lttng-live-internal.h"
7cdc2bab 57
7cdc2bab 58#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 59
087bc060 60#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
61
62static const char *print_state(struct lttng_live_stream_iterator *s)
63{
64 switch (s->state) {
65 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
66 return "ACTIVE_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
68 return "QUIESCENT_NO_DATA";
69 case LTTNG_LIVE_STREAM_QUIESCENT:
70 return "QUIESCENT";
71 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
72 return "ACTIVE_DATA";
73 case LTTNG_LIVE_STREAM_EOF:
74 return "EOF";
75 default:
76 return "ERROR";
77 }
78}
7cdc2bab
MD
79
80#define print_stream_state(stream) \
81 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
82 bt_port_get_name(bt_port_from_private_port(stream->port)), \
83 print_state(stream), stream->last_returned_inactivity_timestamp, \
84 stream->current_inactivity_timestamp)
85
7cdc2bab
MD
86BT_HIDDEN
87int lttng_live_add_port(struct lttng_live_component *lttng_live,
88 struct lttng_live_stream_iterator *stream_iter)
89{
90 int ret;
91 struct bt_private_port *private_port;
92 char name[STREAM_NAME_MAX_LEN];
93
94 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
95 assert(ret > 0);
96 strcpy(stream_iter->name, name);
147337a3
PP
97 ret = bt_private_component_source_add_output_private_port(
98 lttng_live->private_component, name, stream_iter,
99 &private_port);
100 if (ret) {
7cdc2bab
MD
101 return -1;
102 }
087bc060 103 BT_LOGI("Added port %s", name);
7cdc2bab
MD
104
105 if (lttng_live->no_stream_port) {
106 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
107 if (ret) {
108 return -1;
109 }
110 BT_PUT(lttng_live->no_stream_port);
111 lttng_live->no_stream_iter->port = NULL;
112 }
113 stream_iter->port = private_port;
114 return 0;
115}
116
117BT_HIDDEN
118int lttng_live_remove_port(struct lttng_live_component *lttng_live,
119 struct bt_private_port *port)
120{
121 struct bt_component *component;
122 int64_t nr_ports;
123 int ret;
124
125 component = bt_component_from_private_component(lttng_live->private_component);
126 nr_ports = bt_component_source_get_output_port_count(component);
127 if (nr_ports < 0) {
128 return -1;
129 }
130 BT_PUT(component);
131 if (nr_ports == 1) {
132 assert(!lttng_live->no_stream_port);
147337a3
PP
133 ret = bt_private_component_source_add_output_private_port(lttng_live->private_component,
134 "no-stream", lttng_live->no_stream_iter,
135 &lttng_live->no_stream_port);
136 if (ret) {
7cdc2bab
MD
137 return -1;
138 }
139 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
140 }
141 ret = bt_private_port_remove_from_component(port);
142 if (ret) {
143 return -1;
144 }
145 return 0;
146}
147
148static
149struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
150 uint64_t trace_id)
d3e4dcd8 151{
7cdc2bab
MD
152 struct lttng_live_trace *trace;
153
154 bt_list_for_each_entry(trace, &session->traces, node) {
155 if (trace->id == trace_id) {
156 return trace;
157 }
158 }
d3eb6e8f
PP
159 return NULL;
160}
161
7cdc2bab
MD
162static
163void lttng_live_destroy_trace(struct bt_object *obj)
164{
165 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 166 int retval;
7cdc2bab 167
087bc060 168 BT_LOGI("Destroy trace");
7cdc2bab
MD
169 assert(bt_list_empty(&trace->streams));
170 bt_list_del(&trace->node);
5bd230f4
MD
171
172 retval = bt_ctf_trace_set_is_static(trace->trace);
173 assert(!retval);
174
7cdc2bab
MD
175 lttng_live_metadata_fini(trace);
176 BT_PUT(trace->cc_prio_map);
177 g_free(trace);
178}
179
180static
181struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
182 uint64_t trace_id)
183{
184 struct lttng_live_trace *trace = NULL;
185
186 trace = g_new0(struct lttng_live_trace, 1);
187 if (!trace) {
188 goto error;
189 }
190 trace->session = session;
191 trace->id = trace_id;
192 BT_INIT_LIST_HEAD(&trace->streams);
193 trace->new_metadata_needed = true;
194 bt_list_add(&trace->node, &session->traces);
195 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 196 BT_LOGI("Create trace");
7cdc2bab
MD
197 goto end;
198error:
199 g_free(trace);
200 trace = NULL;
201end:
202 return trace;
203}
204
205BT_HIDDEN
206struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
207 uint64_t trace_id)
208{
209 struct lttng_live_trace *trace;
210
211 trace = lttng_live_find_trace(session, trace_id);
212 if (trace) {
213 bt_get(trace);
214 return trace;
215 }
216 return lttng_live_create_trace(session, trace_id);
217}
218
219BT_HIDDEN
220void lttng_live_unref_trace(struct lttng_live_trace *trace)
221{
222 bt_put(trace);
223}
224
225static
226void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
227{
228 struct lttng_live_stream_iterator *stream, *s;
229
230 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
231 lttng_live_stream_iterator_destroy(stream);
232 }
233 lttng_live_metadata_fini(trace);
234}
235
236BT_HIDDEN
06994c71
MD
237int lttng_live_add_session(struct lttng_live_component *lttng_live,
238 uint64_t session_id, const char *hostname,
239 const char *session_name)
7cdc2bab
MD
240{
241 int ret = 0;
242 struct lttng_live_session *s;
243
244 s = g_new0(struct lttng_live_session, 1);
245 if (!s) {
246 goto error;
247 }
248
249 s->id = session_id;
250 BT_INIT_LIST_HEAD(&s->traces);
251 s->lttng_live = lttng_live;
252 s->new_streams_needed = true;
06994c71
MD
253 s->hostname = g_string_new(hostname);
254 s->session_name = g_string_new(session_name);
7cdc2bab 255
06994c71
MD
256 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
257 s->id, hostname, session_name);
7cdc2bab
MD
258 bt_list_add(&s->node, &lttng_live->sessions);
259 goto end;
260error:
087bc060 261 BT_LOGE("Error adding session");
7cdc2bab
MD
262 g_free(s);
263 ret = -1;
264end:
265 return ret;
266}
267
268static
269void lttng_live_destroy_session(struct lttng_live_session *session)
270{
271 struct lttng_live_trace *trace, *t;
272
087bc060 273 BT_LOGI("Destroy session");
7cdc2bab
MD
274 if (session->id != -1ULL) {
275 if (lttng_live_detach_session(session)) {
4c66436f
MD
276 if (!bt_graph_is_canceled(session->lttng_live->graph)) {
277 /* Old relayd cannot detach sessions. */
278 BT_LOGD("Unable to detach session %" PRIu64,
279 session->id);
280 }
7cdc2bab
MD
281 }
282 session->id = -1ULL;
283 }
284 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
285 lttng_live_close_trace_streams(trace);
286 }
287 bt_list_del(&session->node);
06994c71
MD
288 if (session->hostname) {
289 g_string_free(session->hostname, TRUE);
290 }
291 if (session->session_name) {
292 g_string_free(session->session_name, TRUE);
293 }
7cdc2bab
MD
294 g_free(session);
295}
296
297BT_HIDDEN
298void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
299{
300 struct lttng_live_stream_iterator_generic *s =
301 bt_private_notification_iterator_get_user_data(it);
302
303 switch (s->type) {
304 case LIVE_STREAM_TYPE_NO_STREAM:
305 {
306 /* Leave no_stream_iter in place when port is removed. */
307 break;
308 }
309 case LIVE_STREAM_TYPE_STREAM:
310 {
311 struct lttng_live_stream_iterator *stream_iter =
312 container_of(s, struct lttng_live_stream_iterator, p);
313
314 lttng_live_stream_iterator_destroy(stream_iter);
315 break;
316 }
317 }
318}
319
320static
321enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
322 struct lttng_live_component *lttng_live,
323 struct lttng_live_stream_iterator *lttng_live_stream)
324{
325 switch (lttng_live_stream->state) {
326 case LTTNG_LIVE_STREAM_QUIESCENT:
327 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
328 break;
329 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
330 /* Invalid state. */
087bc060
MD
331 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
332 abort();
7cdc2bab
MD
333 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
334 /* Invalid state. */
087bc060
MD
335 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
336 abort();
7cdc2bab
MD
337 case LTTNG_LIVE_STREAM_EOF:
338 break;
339 }
340 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
341}
342
343/*
344 * For active no data stream, fetch next data. It can be either:
345 * - quiescent: need to put it in the prio heap at quiescent end
346 * timestamp,
347 * - have data: need to wire up first event into the prio heap,
348 * - have no data on this stream at this point: need to retry (AGAIN) or
349 * return EOF.
350 */
351static
352enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
353 struct lttng_live_component *lttng_live,
354 struct lttng_live_stream_iterator *lttng_live_stream)
355{
356 enum bt_ctf_lttng_live_iterator_status ret =
357 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
358 struct packet_index index;
359 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
360
361 if (lttng_live_stream->trace->new_metadata_needed) {
362 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
363 goto end;
364 }
365 if (lttng_live_stream->trace->session->new_streams_needed) {
366 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
367 goto end;
368 }
369 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
370 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
371 goto end;
372 }
373 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
374 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
375 goto end;
376 }
377 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
378 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
379 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
380 && lttng_live_stream->last_returned_inactivity_timestamp ==
381 lttng_live_stream->current_inactivity_timestamp) {
382 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
383 print_stream_state(lttng_live_stream);
384 } else {
385 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
386 }
387 goto end;
388 }
389 lttng_live_stream->base_offset = index.offset;
390 lttng_live_stream->offset = index.offset;
391 lttng_live_stream->len = index.packet_size / CHAR_BIT;
392end:
393 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
394 ret = lttng_live_iterator_next_check_stream_state(
395 lttng_live, lttng_live_stream);
396 }
397 return ret;
398}
399
400/*
401 * Creation of the notification requires the ctf trace to be created
402 * beforehand, but the live protocol gives us all streams (including
403 * metadata) at once. So we split it in three steps: getting streams,
404 * getting metadata (which creates the ctf trace), and then creating the
405 * per-stream notifications.
406 */
407static
408enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
409 struct lttng_live_component *lttng_live,
410 struct lttng_live_session *session)
411{
412 enum bt_ctf_lttng_live_iterator_status status;
413 struct lttng_live_trace *trace, *t;
414
415 if (lttng_live_attach_session(session)) {
4c66436f
MD
416 if (bt_graph_is_canceled(lttng_live->graph)) {
417 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
418 } else {
419 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
420 }
7cdc2bab
MD
421 }
422 status = lttng_live_get_new_streams(session);
423 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
424 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
425 return status;
426 }
427 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
428 status = lttng_live_metadata_update(trace);
5bd230f4
MD
429 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
430 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
431 return status;
432 }
433 }
434 return lttng_live_lazy_notif_init(session);
435}
436
437BT_HIDDEN
438void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
439{
440 struct lttng_live_session *session;
441
442 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
443 session->new_streams_needed = true;
444 }
445}
446
447static
448void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
449{
450 struct lttng_live_session *session;
451
452 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
453 struct lttng_live_trace *trace;
454
455 session->new_streams_needed = true;
456 bt_list_for_each_entry(trace, &session->traces, node) {
457 trace->new_metadata_needed = true;
458 }
459 }
460}
461
462static
3cdf4234 463enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
464 struct lttng_live_component *lttng_live)
465{
466 enum bt_ctf_lttng_live_iterator_status ret =
467 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
468 unsigned int nr_sessions_opened = 0;
469 struct lttng_live_session *session, *s;
470
471 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
472 if (session->closed && bt_list_empty(&session->traces)) {
473 lttng_live_destroy_session(session);
474 }
475 }
476 /*
477 * Currently, when there are no sessions, we quit immediately.
478 * We may want to add a component parameter to keep trying until
479 * we get data in the future.
480 * Also, in a remotely distant future, we could add a "new
481 * session" flag to the protocol, which would tell us that we
482 * need to query for new sessions even though we have sessions
483 * currently ongoing.
484 */
485 if (bt_list_empty(&lttng_live->sessions)) {
486 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
487 goto end;
488 }
489 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
490 ret = lttng_live_get_session(lttng_live, session);
491 switch (ret) {
492 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
493 break;
494 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
495 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
496 break;
497 default:
498 goto end;
499 }
500 if (!session->closed) {
501 nr_sessions_opened++;
502 }
503 }
504end:
505 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
506 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
507 }
508 return ret;
509}
510
511static
512enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
513 struct lttng_live_component *lttng_live,
514 struct lttng_live_stream_iterator *lttng_live_stream,
515 struct bt_notification **notification,
516 uint64_t timestamp)
517{
518 enum bt_ctf_lttng_live_iterator_status ret =
519 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
520 struct lttng_live_trace *trace;
521 struct bt_ctf_clock_class *clock_class = NULL;
522 struct bt_ctf_clock_value *clock_value = NULL;
523 struct bt_notification *notif = NULL;
524 int retval;
525
526 trace = lttng_live_stream->trace;
527 if (!trace) {
528 goto error;
529 }
530 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
531 if (!clock_class) {
532 goto error;
533 }
534 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
535 if (!clock_value) {
536 goto error;
537 }
538 notif = bt_notification_inactivity_create(trace->cc_prio_map);
539 if (!notif) {
540 goto error;
541 }
542 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
543 if (retval) {
544 goto error;
545 }
546 *notification = notif;
547end:
548 bt_put(clock_value);
549 bt_put(clock_class);
550 return ret;
551
552error:
553 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
554 bt_put(notif);
555 goto end;
556}
557
558static
559enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
560 struct lttng_live_component *lttng_live,
561 struct lttng_live_stream_iterator *lttng_live_stream,
562 struct bt_notification **notification)
563{
564 enum bt_ctf_lttng_live_iterator_status ret =
565 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
566 struct bt_ctf_clock_class *clock_class = NULL;
567 struct bt_ctf_clock_value *clock_value = NULL;
568
569 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
570 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
571 }
572
573 if (lttng_live_stream->current_inactivity_timestamp ==
574 lttng_live_stream->last_returned_inactivity_timestamp) {
575 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
576 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
577 goto end;
578 }
579
580 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
581 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
582
583 lttng_live_stream->last_returned_inactivity_timestamp =
584 lttng_live_stream->current_inactivity_timestamp;
585end:
586 bt_put(clock_value);
587 bt_put(clock_class);
588 return ret;
589}
590
591static
592enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
593 struct lttng_live_component *lttng_live,
594 struct lttng_live_stream_iterator *lttng_live_stream,
595 struct bt_notification **notification)
596{
597 enum bt_ctf_lttng_live_iterator_status ret =
598 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
599 enum bt_ctf_notif_iter_status status;
600 struct lttng_live_session *session;
601
602 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
603 struct lttng_live_trace *trace;
604
605 if (session->new_streams_needed) {
606 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
607 }
608 bt_list_for_each_entry(trace, &session->traces, node) {
609 if (trace->new_metadata_needed) {
610 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
611 }
612 }
613 }
614
615 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
616 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
617 }
618 if (lttng_live_stream->packet_end_notif_queue) {
619 *notification = lttng_live_stream->packet_end_notif_queue;
620 lttng_live_stream->packet_end_notif_queue = NULL;
621 status = BT_CTF_NOTIF_ITER_STATUS_OK;
622 } else {
623 status = bt_ctf_notif_iter_get_next_notification(
624 lttng_live_stream->notif_iter,
625 lttng_live_stream->trace->cc_prio_map,
626 notification);
627 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
628 /*
629 * Consider empty packets as inactivity.
630 */
631 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
632 lttng_live_stream->packet_end_notif_queue = *notification;
633 *notification = NULL;
634 return emit_inactivity_notification(lttng_live,
635 lttng_live_stream, notification,
636 lttng_live_stream->current_packet_end_timestamp);
637 }
638 }
639 }
640 switch (status) {
641 case BT_CTF_NOTIF_ITER_STATUS_EOF:
642 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
643 break;
644 case BT_CTF_NOTIF_ITER_STATUS_OK:
645 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
646 break;
647 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
648 /*
649 * Continue immediately (end of packet). The next
650 * get_index may return AGAIN to delay the following
651 * attempt.
652 */
653 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
654 break;
655 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
656 /* No argument provided by the user, so don't return INVAL. */
657 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
658 default:
659 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
660 break;
661 }
662 return ret;
663}
664
665/*
666 * helper function:
667 * handle_no_data_streams()
668 * retry:
669 * - for each ACTIVE_NO_DATA stream:
670 * - query relayd for stream data, or quiescence info.
671 * - if need metadata, get metadata, goto retry.
672 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
673 * - if quiescent, move to QUIESCENT streams
674 * - if fetched data, move to ACTIVE_DATA streams
675 * (at this point each stream either has data, or is quiescent)
676 *
677 *
678 * iterator_next:
679 * handle_new_streams_and_metadata()
680 * - query relayd for known streams, add them as ACTIVE_NO_DATA
681 * - query relayd for metadata
682 *
683 * call handle_active_no_data_streams()
684 *
685 * handle_quiescent_streams()
686 * - if at least one stream is ACTIVE_DATA:
687 * - peek stream event with lowest timestamp -> next_ts
688 * - for each quiescent stream
689 * - if next_ts >= quiescent end
690 * - set state to ACTIVE_NO_DATA
691 * - else
692 * - for each quiescent stream
693 * - set state to ACTIVE_NO_DATA
694 *
695 * call handle_active_no_data_streams()
696 *
697 * handle_active_data_streams()
698 * - if at least one stream is ACTIVE_DATA:
699 * - get stream event with lowest timestamp from heap
700 * - make that stream event the current notification.
701 * - move this stream heap position to its next event
702 * - if we need to fetch data from relayd, move
703 * stream to ACTIVE_NO_DATA.
704 * - return OK
705 * - return AGAIN
706 *
707 * end criterion: ctrl-c on client. If relayd exits or the session
708 * closes on the relay daemon side, we keep on waiting for streams.
709 * Eventually handle --end timestamp (also an end criterion).
710 *
711 * When disconnected from relayd: try to re-connect endlessly.
712 */
713static
714struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
715 struct bt_private_notification_iterator *iterator,
716 struct lttng_live_stream_iterator *stream_iter)
717{
718 enum bt_ctf_lttng_live_iterator_status status;
719 struct bt_notification_iterator_next_return next_return;
720 struct lttng_live_component *lttng_live;
721
722 lttng_live = stream_iter->trace->session->lttng_live;
723retry:
724 print_stream_state(stream_iter);
725 next_return.notification = NULL;
726 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
727 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
728 goto end;
729 }
730 status = lttng_live_iterator_next_handle_one_no_data_stream(
731 lttng_live, stream_iter);
732 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
733 goto end;
734 }
735 status = lttng_live_iterator_next_handle_one_quiescent_stream(
736 lttng_live, stream_iter, &next_return.notification);
737 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
738 assert(next_return.notification == NULL);
739 goto end;
740 }
741 if (next_return.notification) {
742 goto end;
743 }
744 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
745 stream_iter, &next_return.notification);
746 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
747 assert(next_return.notification == NULL);
748 }
749
750end:
751 switch (status) {
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
753 print_dbg("continue");
754 goto retry;
755 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
756 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
757 print_dbg("again");
758 break;
759 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
760 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
761 print_dbg("end");
762 break;
763 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
764 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
765 print_dbg("ok");
766 break;
767 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
768 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
769 break;
770 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
771 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
772 break;
773 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
774 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
775 break;
776 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
777 default: /* fall-through */
778 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
779 break;
780 }
781 return next_return;
782}
783
784static
785struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
786 struct bt_private_notification_iterator *iterator,
787 struct lttng_live_no_stream_iterator *no_stream_iter)
788{
789 enum bt_ctf_lttng_live_iterator_status status;
790 struct bt_notification_iterator_next_return next_return;
791 struct lttng_live_component *lttng_live;
792
793 lttng_live = no_stream_iter->lttng_live;
794retry:
795 lttng_live_force_new_streams_and_metadata(lttng_live);
796 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
797 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
798 goto end;
799 }
800 if (no_stream_iter->port) {
801 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
802 } else {
803 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
804 }
805end:
806 switch (status) {
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
808 goto retry;
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
810 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
811 break;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
814 break;
815 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
816 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
817 break;
818 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
819 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
820 break;
821 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
822 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
825 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
826 break;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
828 default: /* fall-through */
829 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
830 break;
831 }
832 return next_return;
833}
834
d3eb6e8f 835BT_HIDDEN
41a2b7ae 836struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 837 struct bt_private_notification_iterator *iterator)
d3eb6e8f 838{
7cdc2bab
MD
839 struct lttng_live_stream_iterator_generic *s =
840 bt_private_notification_iterator_get_user_data(iterator);
841 struct bt_notification_iterator_next_return next_return;
842
843 switch (s->type) {
844 case LIVE_STREAM_TYPE_NO_STREAM:
845 next_return = lttng_live_iterator_next_no_stream(iterator,
846 container_of(s, struct lttng_live_no_stream_iterator, p));
847 break;
848 case LIVE_STREAM_TYPE_STREAM:
849 next_return = lttng_live_iterator_next_stream(iterator,
850 container_of(s, struct lttng_live_stream_iterator, p));
851 break;
852 default:
853 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
854 break;
855 }
856 return next_return;
857}
41a2b7ae 858
7cdc2bab
MD
859BT_HIDDEN
860enum bt_notification_iterator_status lttng_live_iterator_init(
861 struct bt_private_notification_iterator *it,
862 struct bt_private_port *port)
863{
864 enum bt_notification_iterator_status ret =
865 BT_NOTIFICATION_ITERATOR_STATUS_OK;
866 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
867
868 assert(it);
869
870 s = bt_private_port_get_user_data(port);
871 assert(s);
872 switch (s->type) {
873 case LIVE_STREAM_TYPE_NO_STREAM:
874 {
875 struct lttng_live_no_stream_iterator *no_stream_iter =
876 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
877 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
878 if (ret) {
879 goto error;
880 }
881 break;
882 }
883 case LIVE_STREAM_TYPE_STREAM:
884 {
885 struct lttng_live_stream_iterator *stream_iter =
886 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
887 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
888 if (ret) {
889 goto error;
890 }
891 break;
892 }
893 default:
894 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
895 goto end;
896 }
897
898end:
41a2b7ae 899 return ret;
7cdc2bab
MD
900error:
901 if (bt_private_notification_iterator_set_user_data(it, NULL)
902 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 903 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
904 }
905 goto end;
906}
907
908static
909struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
910 struct bt_value *params)
911{
912 struct bt_value *url_value = NULL;
913 struct bt_value *results = NULL;
914 const char *url;
915 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
916
917 url_value = bt_value_map_get(params, "url");
918 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 919 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
920 goto error;
921 }
922
3cdf4234 923 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 924 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
925 goto error;
926 }
927
4c66436f 928 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 929 if (!viewer_connection) {
7cdc2bab
MD
930 goto error;
931 }
932
933 results = bt_live_viewer_connection_list_sessions(viewer_connection);
934 goto end;
935error:
936 BT_PUT(results);
937end:
938 if (viewer_connection) {
939 bt_live_viewer_connection_destroy(viewer_connection);
940 }
941 BT_PUT(url_value);
942 return results;
943}
944
945BT_HIDDEN
946struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
947 const char *object, struct bt_value *params)
948{
949 if (strcmp(object, "sessions") == 0) {
950 return lttng_live_query_list_sessions(comp_class,
951 params);
952 }
087bc060 953 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
954 return NULL;
955}
956
957static
958void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
959{
960 int ret;
961 struct lttng_live_session *session, *s;
962
963 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
964 lttng_live_destroy_session(session);
965 }
966 BT_PUT(lttng_live->viewer_connection);
967 if (lttng_live->url) {
968 g_string_free(lttng_live->url, TRUE);
969 }
970 if (lttng_live->no_stream_port) {
971 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
972 assert(!ret);
973 BT_PUT(lttng_live->no_stream_port);
974 }
975 if (lttng_live->no_stream_iter) {
976 g_free(lttng_live->no_stream_iter);
977 }
978 g_free(lttng_live);
979}
980
981BT_HIDDEN
982void lttng_live_component_finalize(struct bt_private_component *component)
983{
984 void *data = bt_private_component_get_user_data(component);
985
986 if (!data) {
987 return;
988 }
989 lttng_live_component_destroy_data(data);
990}
991
992static
993struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
3cdf4234
MD
994 struct bt_private_component *private_component,
995 struct bt_graph *graph)
7cdc2bab
MD
996{
997 struct lttng_live_component *lttng_live;
998 struct bt_value *value = NULL;
999 const char *url;
1000 enum bt_value_status ret;
1001
1002 lttng_live = g_new0(struct lttng_live_component, 1);
1003 if (!lttng_live) {
1004 goto end;
1005 }
7cdc2bab
MD
1006 /* TODO: make this an overridable parameter. */
1007 lttng_live->max_query_size = MAX_QUERY_SIZE;
1008 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1009 value = bt_value_map_get(params, "url");
1010 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1011 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1012 goto error;
1013 }
1014 ret = bt_value_string_get(value, &url);
1015 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1016 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1017 goto error;
1018 }
1019 lttng_live->url = g_string_new(url);
1020 if (!lttng_live->url) {
1021 goto error;
1022 }
1023 lttng_live->viewer_connection =
4c66436f 1024 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1025 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1026 goto error;
1027 }
1028 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1029 goto error;
1030 }
1031 lttng_live->private_component = private_component;
3cdf4234 1032 lttng_live->graph = graph;
4c66436f 1033
7cdc2bab
MD
1034 goto end;
1035
1036error:
1037 lttng_live_component_destroy_data(lttng_live);
1038 lttng_live = NULL;
1039end:
1040 return lttng_live;
d3e4dcd8
PP
1041}
1042
f3bc2010 1043BT_HIDDEN
3cdf4234
MD
1044enum bt_component_status lttng_live_component_init(
1045 struct bt_private_component *private_component,
7cdc2bab 1046 struct bt_value *params, void *init_method_data)
f3bc2010 1047{
7cdc2bab
MD
1048 struct lttng_live_component *lttng_live;
1049 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
3cdf4234
MD
1050 struct bt_component *component;
1051 struct bt_graph *graph;
1052
1053 component = bt_component_from_private_component(private_component);
1054 graph = bt_component_get_graph(component);
1055 bt_put(graph); /* weak */
1056 bt_put(component);
7cdc2bab 1057
7cdc2bab 1058 /* Passes ownership of iter ref to lttng_live_component_create. */
3cdf4234
MD
1059 lttng_live = lttng_live_component_create(params, private_component,
1060 graph);
7cdc2bab 1061 if (!lttng_live) {
3cdf4234
MD
1062 if (bt_graph_is_canceled(graph)) {
1063 ret = BT_COMPONENT_STATUS_AGAIN;
1064 } else {
1065 ret = BT_COMPONENT_STATUS_NOMEM;
1066 }
7cdc2bab
MD
1067 goto end;
1068 }
1069
1070 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1071 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1072 lttng_live->no_stream_iter->lttng_live = lttng_live;
147337a3 1073 ret = bt_private_component_source_add_output_private_port(
7cdc2bab 1074 lttng_live->private_component, "no-stream",
147337a3
PP
1075 lttng_live->no_stream_iter,
1076 &lttng_live->no_stream_port);
1077 if (ret != BT_COMPONENT_STATUS_OK) {
1078 goto end;
1079 }
1080
7cdc2bab
MD
1081 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1082
3cdf4234 1083 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1084 if (ret != BT_COMPONENT_STATUS_OK) {
1085 goto error;
1086 }
1087
1088end:
1089 return ret;
1090error:
3cdf4234 1091 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1092 lttng_live_component_destroy_data(lttng_live);
1093 return ret;
f3bc2010 1094}
087bc060 1095
d85ef162
MD
1096BT_HIDDEN
1097enum bt_component_status lttng_live_accept_port_connection(
1098 struct bt_private_component *private_component,
1099 struct bt_private_port *self_private_port,
1100 struct bt_port *other_port)
1101{
1102 struct lttng_live_component *lttng_live =
1103 bt_private_component_get_user_data(private_component);
1104 struct bt_component *other_component;
1105 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1106 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1107
1108 other_component = bt_port_get_component(other_port);
1109 bt_put(other_component); /* weak */
1110
1111 if (!lttng_live->downstream_component) {
1112 lttng_live->downstream_component = other_component;
1113 goto end;
1114 }
1115
1116 /*
1117 * Compare prior component to ensure we are connected to the
1118 * same downstream component as prior ports.
1119 */
1120 if (lttng_live->downstream_component != other_component) {
1121 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1122 bt_port_get_name(self_port),
1123 bt_component_get_name(other_component),
1124 bt_component_get_name(lttng_live->downstream_component));
1125 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1126 goto end;
1127 }
1128end:
1129 bt_put(self_port);
1130 return status;
1131}
This page took 0.076133 seconds and 4 git commands to generate.