plugins/ctf/common/btr: use standard logging files and macros
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
7cdc2bab 30#include <babeltrace/ctf-ir/packet.h>
b2e0c907 31#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
32#include <babeltrace/graph/private-port.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component.h>
35#include <babeltrace/graph/private-component-source.h>
36#include <babeltrace/graph/private-notification-iterator.h>
37#include <babeltrace/graph/notification-stream.h>
38#include <babeltrace/graph/notification-packet.h>
39#include <babeltrace/graph/notification-event.h>
40#include <babeltrace/graph/notification-heap.h>
41#include <babeltrace/graph/notification-iterator.h>
42#include <babeltrace/graph/notification-inactivity.h>
4c66436f 43#include <babeltrace/graph/graph.h>
7cdc2bab
MD
44#include <babeltrace/compiler-internal.h>
45#include <inttypes.h>
46#include <glib.h>
47#include <assert.h>
48#include <unistd.h>
7d61fa8e 49#include <plugins-common.h>
f3bc2010 50
087bc060 51#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
047358f9 52#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
087bc060 53
7cdc2bab
MD
54#include "data-stream.h"
55#include "metadata.h"
0f5e83e5 56#include "lttng-live-internal.h"
7cdc2bab 57
7cdc2bab 58#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 59
087bc060 60#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
61
62static const char *print_state(struct lttng_live_stream_iterator *s)
63{
64 switch (s->state) {
65 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
66 return "ACTIVE_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
68 return "QUIESCENT_NO_DATA";
69 case LTTNG_LIVE_STREAM_QUIESCENT:
70 return "QUIESCENT";
71 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
72 return "ACTIVE_DATA";
73 case LTTNG_LIVE_STREAM_EOF:
74 return "EOF";
75 default:
76 return "ERROR";
77 }
78}
7cdc2bab
MD
79
80#define print_stream_state(stream) \
81 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
82 bt_port_get_name(bt_port_from_private_port(stream->port)), \
83 print_state(stream), stream->last_returned_inactivity_timestamp, \
84 stream->current_inactivity_timestamp)
85
d3e4dcd8 86BT_HIDDEN
087bc060 87int bt_lttng_live_log_level = BT_LOG_NONE;
7cdc2bab
MD
88
89BT_HIDDEN
90int lttng_live_add_port(struct lttng_live_component *lttng_live,
91 struct lttng_live_stream_iterator *stream_iter)
92{
93 int ret;
94 struct bt_private_port *private_port;
95 char name[STREAM_NAME_MAX_LEN];
96
97 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
98 assert(ret > 0);
99 strcpy(stream_iter->name, name);
100 private_port = bt_private_component_source_add_output_private_port(
101 lttng_live->private_component, name, stream_iter);
102 if (!private_port) {
103 return -1;
104 }
087bc060 105 BT_LOGI("Added port %s", name);
7cdc2bab
MD
106
107 if (lttng_live->no_stream_port) {
108 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
109 if (ret) {
110 return -1;
111 }
112 BT_PUT(lttng_live->no_stream_port);
113 lttng_live->no_stream_iter->port = NULL;
114 }
115 stream_iter->port = private_port;
116 return 0;
117}
118
119BT_HIDDEN
120int lttng_live_remove_port(struct lttng_live_component *lttng_live,
121 struct bt_private_port *port)
122{
123 struct bt_component *component;
124 int64_t nr_ports;
125 int ret;
126
127 component = bt_component_from_private_component(lttng_live->private_component);
128 nr_ports = bt_component_source_get_output_port_count(component);
129 if (nr_ports < 0) {
130 return -1;
131 }
132 BT_PUT(component);
133 if (nr_ports == 1) {
134 assert(!lttng_live->no_stream_port);
135 lttng_live->no_stream_port =
136 bt_private_component_source_add_output_private_port(lttng_live->private_component,
137 "no-stream", lttng_live->no_stream_iter);
138 if (!lttng_live->no_stream_port) {
139 return -1;
140 }
141 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
142 }
143 ret = bt_private_port_remove_from_component(port);
144 if (ret) {
145 return -1;
146 }
147 return 0;
148}
149
150static
151struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
152 uint64_t trace_id)
d3e4dcd8 153{
7cdc2bab
MD
154 struct lttng_live_trace *trace;
155
156 bt_list_for_each_entry(trace, &session->traces, node) {
157 if (trace->id == trace_id) {
158 return trace;
159 }
160 }
d3eb6e8f
PP
161 return NULL;
162}
163
7cdc2bab
MD
164static
165void lttng_live_destroy_trace(struct bt_object *obj)
166{
167 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 168 int retval;
7cdc2bab 169
087bc060 170 BT_LOGI("Destroy trace");
7cdc2bab
MD
171 assert(bt_list_empty(&trace->streams));
172 bt_list_del(&trace->node);
5bd230f4
MD
173
174 retval = bt_ctf_trace_set_is_static(trace->trace);
175 assert(!retval);
176
7cdc2bab
MD
177 lttng_live_metadata_fini(trace);
178 BT_PUT(trace->cc_prio_map);
179 g_free(trace);
180}
181
182static
183struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
184 uint64_t trace_id)
185{
186 struct lttng_live_trace *trace = NULL;
187
188 trace = g_new0(struct lttng_live_trace, 1);
189 if (!trace) {
190 goto error;
191 }
192 trace->session = session;
193 trace->id = trace_id;
194 BT_INIT_LIST_HEAD(&trace->streams);
195 trace->new_metadata_needed = true;
196 bt_list_add(&trace->node, &session->traces);
197 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 198 BT_LOGI("Create trace");
7cdc2bab
MD
199 goto end;
200error:
201 g_free(trace);
202 trace = NULL;
203end:
204 return trace;
205}
206
207BT_HIDDEN
208struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
209 uint64_t trace_id)
210{
211 struct lttng_live_trace *trace;
212
213 trace = lttng_live_find_trace(session, trace_id);
214 if (trace) {
215 bt_get(trace);
216 return trace;
217 }
218 return lttng_live_create_trace(session, trace_id);
219}
220
221BT_HIDDEN
222void lttng_live_unref_trace(struct lttng_live_trace *trace)
223{
224 bt_put(trace);
225}
226
227static
228void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
229{
230 struct lttng_live_stream_iterator *stream, *s;
231
232 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
233 lttng_live_stream_iterator_destroy(stream);
234 }
235 lttng_live_metadata_fini(trace);
236}
237
238BT_HIDDEN
239int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
240{
241 int ret = 0;
242 struct lttng_live_session *s;
243
244 s = g_new0(struct lttng_live_session, 1);
245 if (!s) {
246 goto error;
247 }
248
249 s->id = session_id;
250 BT_INIT_LIST_HEAD(&s->traces);
251 s->lttng_live = lttng_live;
252 s->new_streams_needed = true;
253
087bc060 254 BT_LOGI("Reading from session %" PRIu64, s->id);
7cdc2bab
MD
255 bt_list_add(&s->node, &lttng_live->sessions);
256 goto end;
257error:
087bc060 258 BT_LOGE("Error adding session");
7cdc2bab
MD
259 g_free(s);
260 ret = -1;
261end:
262 return ret;
263}
264
265static
266void lttng_live_destroy_session(struct lttng_live_session *session)
267{
268 struct lttng_live_trace *trace, *t;
269
087bc060 270 BT_LOGI("Destroy session");
7cdc2bab
MD
271 if (session->id != -1ULL) {
272 if (lttng_live_detach_session(session)) {
4c66436f
MD
273 if (!bt_graph_is_canceled(session->lttng_live->graph)) {
274 /* Old relayd cannot detach sessions. */
275 BT_LOGD("Unable to detach session %" PRIu64,
276 session->id);
277 }
7cdc2bab
MD
278 }
279 session->id = -1ULL;
280 }
281 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
282 lttng_live_close_trace_streams(trace);
283 }
284 bt_list_del(&session->node);
285 g_free(session);
286}
287
288BT_HIDDEN
289void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
290{
291 struct lttng_live_stream_iterator_generic *s =
292 bt_private_notification_iterator_get_user_data(it);
293
294 switch (s->type) {
295 case LIVE_STREAM_TYPE_NO_STREAM:
296 {
297 /* Leave no_stream_iter in place when port is removed. */
298 break;
299 }
300 case LIVE_STREAM_TYPE_STREAM:
301 {
302 struct lttng_live_stream_iterator *stream_iter =
303 container_of(s, struct lttng_live_stream_iterator, p);
304
305 lttng_live_stream_iterator_destroy(stream_iter);
306 break;
307 }
308 }
309}
310
311static
312enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
313 struct lttng_live_component *lttng_live,
314 struct lttng_live_stream_iterator *lttng_live_stream)
315{
316 switch (lttng_live_stream->state) {
317 case LTTNG_LIVE_STREAM_QUIESCENT:
318 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
319 break;
320 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
321 /* Invalid state. */
087bc060
MD
322 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
323 abort();
7cdc2bab
MD
324 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
325 /* Invalid state. */
087bc060
MD
326 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
327 abort();
7cdc2bab
MD
328 case LTTNG_LIVE_STREAM_EOF:
329 break;
330 }
331 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
332}
333
334/*
335 * For active no data stream, fetch next data. It can be either:
336 * - quiescent: need to put it in the prio heap at quiescent end
337 * timestamp,
338 * - have data: need to wire up first event into the prio heap,
339 * - have no data on this stream at this point: need to retry (AGAIN) or
340 * return EOF.
341 */
342static
343enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
344 struct lttng_live_component *lttng_live,
345 struct lttng_live_stream_iterator *lttng_live_stream)
346{
347 enum bt_ctf_lttng_live_iterator_status ret =
348 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
349 struct packet_index index;
350 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
351
352 if (lttng_live_stream->trace->new_metadata_needed) {
353 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
354 goto end;
355 }
356 if (lttng_live_stream->trace->session->new_streams_needed) {
357 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
358 goto end;
359 }
360 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
361 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
362 goto end;
363 }
364 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
365 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
366 goto end;
367 }
368 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
369 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
370 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
371 && lttng_live_stream->last_returned_inactivity_timestamp ==
372 lttng_live_stream->current_inactivity_timestamp) {
373 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
374 print_stream_state(lttng_live_stream);
375 } else {
376 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
377 }
378 goto end;
379 }
380 lttng_live_stream->base_offset = index.offset;
381 lttng_live_stream->offset = index.offset;
382 lttng_live_stream->len = index.packet_size / CHAR_BIT;
383end:
384 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
385 ret = lttng_live_iterator_next_check_stream_state(
386 lttng_live, lttng_live_stream);
387 }
388 return ret;
389}
390
391/*
392 * Creation of the notification requires the ctf trace to be created
393 * beforehand, but the live protocol gives us all streams (including
394 * metadata) at once. So we split it in three steps: getting streams,
395 * getting metadata (which creates the ctf trace), and then creating the
396 * per-stream notifications.
397 */
398static
399enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
400 struct lttng_live_component *lttng_live,
401 struct lttng_live_session *session)
402{
403 enum bt_ctf_lttng_live_iterator_status status;
404 struct lttng_live_trace *trace, *t;
405
406 if (lttng_live_attach_session(session)) {
4c66436f
MD
407 if (bt_graph_is_canceled(lttng_live->graph)) {
408 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
409 } else {
410 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
411 }
7cdc2bab
MD
412 }
413 status = lttng_live_get_new_streams(session);
414 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
415 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
416 return status;
417 }
418 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
419 status = lttng_live_metadata_update(trace);
5bd230f4
MD
420 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
421 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
422 return status;
423 }
424 }
425 return lttng_live_lazy_notif_init(session);
426}
427
428BT_HIDDEN
429void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
430{
431 struct lttng_live_session *session;
432
433 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
434 session->new_streams_needed = true;
435 }
436}
437
438static
439void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
440{
441 struct lttng_live_session *session;
442
443 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
444 struct lttng_live_trace *trace;
445
446 session->new_streams_needed = true;
447 bt_list_for_each_entry(trace, &session->traces, node) {
448 trace->new_metadata_needed = true;
449 }
450 }
451}
452
453static
3cdf4234 454enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
455 struct lttng_live_component *lttng_live)
456{
457 enum bt_ctf_lttng_live_iterator_status ret =
458 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
459 unsigned int nr_sessions_opened = 0;
460 struct lttng_live_session *session, *s;
461
462 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
463 if (session->closed && bt_list_empty(&session->traces)) {
464 lttng_live_destroy_session(session);
465 }
466 }
467 /*
468 * Currently, when there are no sessions, we quit immediately.
469 * We may want to add a component parameter to keep trying until
470 * we get data in the future.
471 * Also, in a remotely distant future, we could add a "new
472 * session" flag to the protocol, which would tell us that we
473 * need to query for new sessions even though we have sessions
474 * currently ongoing.
475 */
476 if (bt_list_empty(&lttng_live->sessions)) {
477 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
478 goto end;
479 }
480 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
481 ret = lttng_live_get_session(lttng_live, session);
482 switch (ret) {
483 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
484 break;
485 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
486 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
487 break;
488 default:
489 goto end;
490 }
491 if (!session->closed) {
492 nr_sessions_opened++;
493 }
494 }
495end:
496 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
497 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
498 }
499 return ret;
500}
501
502static
503enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
504 struct lttng_live_component *lttng_live,
505 struct lttng_live_stream_iterator *lttng_live_stream,
506 struct bt_notification **notification,
507 uint64_t timestamp)
508{
509 enum bt_ctf_lttng_live_iterator_status ret =
510 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
511 struct lttng_live_trace *trace;
512 struct bt_ctf_clock_class *clock_class = NULL;
513 struct bt_ctf_clock_value *clock_value = NULL;
514 struct bt_notification *notif = NULL;
515 int retval;
516
517 trace = lttng_live_stream->trace;
518 if (!trace) {
519 goto error;
520 }
521 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
522 if (!clock_class) {
523 goto error;
524 }
525 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
526 if (!clock_value) {
527 goto error;
528 }
529 notif = bt_notification_inactivity_create(trace->cc_prio_map);
530 if (!notif) {
531 goto error;
532 }
533 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
534 if (retval) {
535 goto error;
536 }
537 *notification = notif;
538end:
539 bt_put(clock_value);
540 bt_put(clock_class);
541 return ret;
542
543error:
544 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
545 bt_put(notif);
546 goto end;
547}
548
549static
550enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
551 struct lttng_live_component *lttng_live,
552 struct lttng_live_stream_iterator *lttng_live_stream,
553 struct bt_notification **notification)
554{
555 enum bt_ctf_lttng_live_iterator_status ret =
556 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
557 struct bt_ctf_clock_class *clock_class = NULL;
558 struct bt_ctf_clock_value *clock_value = NULL;
559
560 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
561 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
562 }
563
564 if (lttng_live_stream->current_inactivity_timestamp ==
565 lttng_live_stream->last_returned_inactivity_timestamp) {
566 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
567 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
568 goto end;
569 }
570
571 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
572 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
573
574 lttng_live_stream->last_returned_inactivity_timestamp =
575 lttng_live_stream->current_inactivity_timestamp;
576end:
577 bt_put(clock_value);
578 bt_put(clock_class);
579 return ret;
580}
581
582static
583enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
584 struct lttng_live_component *lttng_live,
585 struct lttng_live_stream_iterator *lttng_live_stream,
586 struct bt_notification **notification)
587{
588 enum bt_ctf_lttng_live_iterator_status ret =
589 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
590 enum bt_ctf_notif_iter_status status;
591 struct lttng_live_session *session;
592
593 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
594 struct lttng_live_trace *trace;
595
596 if (session->new_streams_needed) {
597 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
598 }
599 bt_list_for_each_entry(trace, &session->traces, node) {
600 if (trace->new_metadata_needed) {
601 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
602 }
603 }
604 }
605
606 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
607 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
608 }
609 if (lttng_live_stream->packet_end_notif_queue) {
610 *notification = lttng_live_stream->packet_end_notif_queue;
611 lttng_live_stream->packet_end_notif_queue = NULL;
612 status = BT_CTF_NOTIF_ITER_STATUS_OK;
613 } else {
614 status = bt_ctf_notif_iter_get_next_notification(
615 lttng_live_stream->notif_iter,
616 lttng_live_stream->trace->cc_prio_map,
617 notification);
618 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
619 /*
620 * Consider empty packets as inactivity.
621 */
622 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
623 lttng_live_stream->packet_end_notif_queue = *notification;
624 *notification = NULL;
625 return emit_inactivity_notification(lttng_live,
626 lttng_live_stream, notification,
627 lttng_live_stream->current_packet_end_timestamp);
628 }
629 }
630 }
631 switch (status) {
632 case BT_CTF_NOTIF_ITER_STATUS_EOF:
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
634 break;
635 case BT_CTF_NOTIF_ITER_STATUS_OK:
636 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
637 break;
638 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
639 /*
640 * Continue immediately (end of packet). The next
641 * get_index may return AGAIN to delay the following
642 * attempt.
643 */
644 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
645 break;
646 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
647 /* No argument provided by the user, so don't return INVAL. */
648 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
649 default:
650 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
651 break;
652 }
653 return ret;
654}
655
656/*
657 * helper function:
658 * handle_no_data_streams()
659 * retry:
660 * - for each ACTIVE_NO_DATA stream:
661 * - query relayd for stream data, or quiescence info.
662 * - if need metadata, get metadata, goto retry.
663 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
664 * - if quiescent, move to QUIESCENT streams
665 * - if fetched data, move to ACTIVE_DATA streams
666 * (at this point each stream either has data, or is quiescent)
667 *
668 *
669 * iterator_next:
670 * handle_new_streams_and_metadata()
671 * - query relayd for known streams, add them as ACTIVE_NO_DATA
672 * - query relayd for metadata
673 *
674 * call handle_active_no_data_streams()
675 *
676 * handle_quiescent_streams()
677 * - if at least one stream is ACTIVE_DATA:
678 * - peek stream event with lowest timestamp -> next_ts
679 * - for each quiescent stream
680 * - if next_ts >= quiescent end
681 * - set state to ACTIVE_NO_DATA
682 * - else
683 * - for each quiescent stream
684 * - set state to ACTIVE_NO_DATA
685 *
686 * call handle_active_no_data_streams()
687 *
688 * handle_active_data_streams()
689 * - if at least one stream is ACTIVE_DATA:
690 * - get stream event with lowest timestamp from heap
691 * - make that stream event the current notification.
692 * - move this stream heap position to its next event
693 * - if we need to fetch data from relayd, move
694 * stream to ACTIVE_NO_DATA.
695 * - return OK
696 * - return AGAIN
697 *
698 * end criterion: ctrl-c on client. If relayd exits or the session
699 * closes on the relay daemon side, we keep on waiting for streams.
700 * Eventually handle --end timestamp (also an end criterion).
701 *
702 * When disconnected from relayd: try to re-connect endlessly.
703 */
704static
705struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
706 struct bt_private_notification_iterator *iterator,
707 struct lttng_live_stream_iterator *stream_iter)
708{
709 enum bt_ctf_lttng_live_iterator_status status;
710 struct bt_notification_iterator_next_return next_return;
711 struct lttng_live_component *lttng_live;
712
713 lttng_live = stream_iter->trace->session->lttng_live;
714retry:
715 print_stream_state(stream_iter);
716 next_return.notification = NULL;
717 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
718 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
719 goto end;
720 }
721 status = lttng_live_iterator_next_handle_one_no_data_stream(
722 lttng_live, stream_iter);
723 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
724 goto end;
725 }
726 status = lttng_live_iterator_next_handle_one_quiescent_stream(
727 lttng_live, stream_iter, &next_return.notification);
728 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
729 assert(next_return.notification == NULL);
730 goto end;
731 }
732 if (next_return.notification) {
733 goto end;
734 }
735 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
736 stream_iter, &next_return.notification);
737 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
738 assert(next_return.notification == NULL);
739 }
740
741end:
742 switch (status) {
743 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
744 print_dbg("continue");
745 goto retry;
746 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
747 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
748 print_dbg("again");
749 break;
750 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
751 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
752 print_dbg("end");
753 break;
754 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
755 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
756 print_dbg("ok");
757 break;
758 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
759 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
760 break;
761 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
762 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
763 break;
764 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
765 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
766 break;
767 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
768 default: /* fall-through */
769 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
770 break;
771 }
772 return next_return;
773}
774
775static
776struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
777 struct bt_private_notification_iterator *iterator,
778 struct lttng_live_no_stream_iterator *no_stream_iter)
779{
780 enum bt_ctf_lttng_live_iterator_status status;
781 struct bt_notification_iterator_next_return next_return;
782 struct lttng_live_component *lttng_live;
783
784 lttng_live = no_stream_iter->lttng_live;
785retry:
786 lttng_live_force_new_streams_and_metadata(lttng_live);
787 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
788 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
789 goto end;
790 }
791 if (no_stream_iter->port) {
792 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
793 } else {
794 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
795 }
796end:
797 switch (status) {
798 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
799 goto retry;
800 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
801 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
802 break;
803 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
805 break;
806 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
807 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
808 break;
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
810 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
811 break;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
814 break;
815 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
816 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
817 break;
818 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
819 default: /* fall-through */
820 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
821 break;
822 }
823 return next_return;
824}
825
d3eb6e8f 826BT_HIDDEN
41a2b7ae 827struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 828 struct bt_private_notification_iterator *iterator)
d3eb6e8f 829{
7cdc2bab
MD
830 struct lttng_live_stream_iterator_generic *s =
831 bt_private_notification_iterator_get_user_data(iterator);
832 struct bt_notification_iterator_next_return next_return;
833
834 switch (s->type) {
835 case LIVE_STREAM_TYPE_NO_STREAM:
836 next_return = lttng_live_iterator_next_no_stream(iterator,
837 container_of(s, struct lttng_live_no_stream_iterator, p));
838 break;
839 case LIVE_STREAM_TYPE_STREAM:
840 next_return = lttng_live_iterator_next_stream(iterator,
841 container_of(s, struct lttng_live_stream_iterator, p));
842 break;
843 default:
844 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
845 break;
846 }
847 return next_return;
848}
41a2b7ae 849
7cdc2bab
MD
850BT_HIDDEN
851enum bt_notification_iterator_status lttng_live_iterator_init(
852 struct bt_private_notification_iterator *it,
853 struct bt_private_port *port)
854{
855 enum bt_notification_iterator_status ret =
856 BT_NOTIFICATION_ITERATOR_STATUS_OK;
857 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
858
859 assert(it);
860
861 s = bt_private_port_get_user_data(port);
862 assert(s);
863 switch (s->type) {
864 case LIVE_STREAM_TYPE_NO_STREAM:
865 {
866 struct lttng_live_no_stream_iterator *no_stream_iter =
867 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
868 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
869 if (ret) {
870 goto error;
871 }
872 break;
873 }
874 case LIVE_STREAM_TYPE_STREAM:
875 {
876 struct lttng_live_stream_iterator *stream_iter =
877 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
878 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
879 if (ret) {
880 goto error;
881 }
882 break;
883 }
884 default:
885 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
886 goto end;
887 }
888
889end:
41a2b7ae 890 return ret;
7cdc2bab
MD
891error:
892 if (bt_private_notification_iterator_set_user_data(it, NULL)
893 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 894 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
895 }
896 goto end;
897}
898
899static
900struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
901 struct bt_value *params)
902{
903 struct bt_value *url_value = NULL;
904 struct bt_value *results = NULL;
905 const char *url;
906 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
907
908 url_value = bt_value_map_get(params, "url");
909 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 910 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
911 goto error;
912 }
913
3cdf4234 914 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 915 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
916 goto error;
917 }
918
4c66436f 919 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 920 if (!viewer_connection) {
7cdc2bab
MD
921 goto error;
922 }
923
924 results = bt_live_viewer_connection_list_sessions(viewer_connection);
925 goto end;
926error:
927 BT_PUT(results);
928end:
929 if (viewer_connection) {
930 bt_live_viewer_connection_destroy(viewer_connection);
931 }
932 BT_PUT(url_value);
933 return results;
934}
935
936BT_HIDDEN
937struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
938 const char *object, struct bt_value *params)
939{
940 if (strcmp(object, "sessions") == 0) {
941 return lttng_live_query_list_sessions(comp_class,
942 params);
943 }
087bc060 944 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
945 return NULL;
946}
947
948static
949void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
950{
951 int ret;
952 struct lttng_live_session *session, *s;
953
954 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
955 lttng_live_destroy_session(session);
956 }
957 BT_PUT(lttng_live->viewer_connection);
958 if (lttng_live->url) {
959 g_string_free(lttng_live->url, TRUE);
960 }
961 if (lttng_live->no_stream_port) {
962 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
963 assert(!ret);
964 BT_PUT(lttng_live->no_stream_port);
965 }
966 if (lttng_live->no_stream_iter) {
967 g_free(lttng_live->no_stream_iter);
968 }
969 g_free(lttng_live);
970}
971
972BT_HIDDEN
973void lttng_live_component_finalize(struct bt_private_component *component)
974{
975 void *data = bt_private_component_get_user_data(component);
976
977 if (!data) {
978 return;
979 }
980 lttng_live_component_destroy_data(data);
981}
982
983static
984struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
3cdf4234
MD
985 struct bt_private_component *private_component,
986 struct bt_graph *graph)
7cdc2bab
MD
987{
988 struct lttng_live_component *lttng_live;
989 struct bt_value *value = NULL;
990 const char *url;
991 enum bt_value_status ret;
992
993 lttng_live = g_new0(struct lttng_live_component, 1);
994 if (!lttng_live) {
995 goto end;
996 }
7cdc2bab
MD
997 /* TODO: make this an overridable parameter. */
998 lttng_live->max_query_size = MAX_QUERY_SIZE;
999 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1000 value = bt_value_map_get(params, "url");
1001 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1002 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1003 goto error;
1004 }
1005 ret = bt_value_string_get(value, &url);
1006 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1007 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1008 goto error;
1009 }
1010 lttng_live->url = g_string_new(url);
1011 if (!lttng_live->url) {
1012 goto error;
1013 }
1014 lttng_live->viewer_connection =
4c66436f 1015 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1016 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1017 goto error;
1018 }
1019 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1020 goto error;
1021 }
1022 lttng_live->private_component = private_component;
3cdf4234 1023 lttng_live->graph = graph;
4c66436f 1024
7cdc2bab
MD
1025 goto end;
1026
1027error:
1028 lttng_live_component_destroy_data(lttng_live);
1029 lttng_live = NULL;
1030end:
1031 return lttng_live;
d3e4dcd8
PP
1032}
1033
f3bc2010 1034BT_HIDDEN
3cdf4234
MD
1035enum bt_component_status lttng_live_component_init(
1036 struct bt_private_component *private_component,
7cdc2bab 1037 struct bt_value *params, void *init_method_data)
f3bc2010 1038{
7cdc2bab
MD
1039 struct lttng_live_component *lttng_live;
1040 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
3cdf4234
MD
1041 struct bt_component *component;
1042 struct bt_graph *graph;
1043
1044 component = bt_component_from_private_component(private_component);
1045 graph = bt_component_get_graph(component);
1046 bt_put(graph); /* weak */
1047 bt_put(component);
7cdc2bab 1048
7cdc2bab 1049 /* Passes ownership of iter ref to lttng_live_component_create. */
3cdf4234
MD
1050 lttng_live = lttng_live_component_create(params, private_component,
1051 graph);
7cdc2bab 1052 if (!lttng_live) {
3cdf4234
MD
1053 if (bt_graph_is_canceled(graph)) {
1054 ret = BT_COMPONENT_STATUS_AGAIN;
1055 } else {
1056 ret = BT_COMPONENT_STATUS_NOMEM;
1057 }
7cdc2bab
MD
1058 goto end;
1059 }
1060
1061 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1062 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1063 lttng_live->no_stream_iter->lttng_live = lttng_live;
1064
1065 lttng_live->no_stream_port =
1066 bt_private_component_source_add_output_private_port(
1067 lttng_live->private_component, "no-stream",
1068 lttng_live->no_stream_iter);
1069 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1070
3cdf4234 1071 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1072 if (ret != BT_COMPONENT_STATUS_OK) {
1073 goto error;
1074 }
1075
1076end:
1077 return ret;
1078error:
3cdf4234 1079 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1080 lttng_live_component_destroy_data(lttng_live);
1081 return ret;
f3bc2010 1082}
087bc060 1083
d85ef162
MD
1084BT_HIDDEN
1085enum bt_component_status lttng_live_accept_port_connection(
1086 struct bt_private_component *private_component,
1087 struct bt_private_port *self_private_port,
1088 struct bt_port *other_port)
1089{
1090 struct lttng_live_component *lttng_live =
1091 bt_private_component_get_user_data(private_component);
1092 struct bt_component *other_component;
1093 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1094 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1095
1096 other_component = bt_port_get_component(other_port);
1097 bt_put(other_component); /* weak */
1098
1099 if (!lttng_live->downstream_component) {
1100 lttng_live->downstream_component = other_component;
1101 goto end;
1102 }
1103
1104 /*
1105 * Compare prior component to ensure we are connected to the
1106 * same downstream component as prior ports.
1107 */
1108 if (lttng_live->downstream_component != other_component) {
1109 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1110 bt_port_get_name(self_port),
1111 bt_component_get_name(other_component),
1112 bt_component_get_name(lttng_live->downstream_component));
1113 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1114 goto end;
1115 }
1116end:
1117 bt_put(self_port);
1118 return status;
1119}
1120
087bc060
MD
1121static
1122void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
1123{
373c938b 1124 bt_lttng_live_log_level = bt_log_get_level_from_env(BT_LOGLEVEL_NAME);
087bc060 1125}
This page took 0.075919 seconds and 4 git commands to generate.