Implement logging in lttng-live component
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
7cdc2bab 30#include <babeltrace/ctf-ir/packet.h>
b2e0c907 31#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
32#include <babeltrace/graph/private-port.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component.h>
35#include <babeltrace/graph/private-component-source.h>
36#include <babeltrace/graph/private-notification-iterator.h>
37#include <babeltrace/graph/notification-stream.h>
38#include <babeltrace/graph/notification-packet.h>
39#include <babeltrace/graph/notification-event.h>
40#include <babeltrace/graph/notification-heap.h>
41#include <babeltrace/graph/notification-iterator.h>
42#include <babeltrace/graph/notification-inactivity.h>
43#include <babeltrace/compiler-internal.h>
44#include <inttypes.h>
45#include <glib.h>
46#include <assert.h>
47#include <unistd.h>
7d61fa8e 48#include <plugins-common.h>
f3bc2010 49
087bc060
MD
50#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
51
7cdc2bab
MD
52#include "lttng-live-internal.h"
53#include "data-stream.h"
54#include "metadata.h"
55
7cdc2bab 56#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 57
087bc060 58#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
59
60static const char *print_state(struct lttng_live_stream_iterator *s)
61{
62 switch (s->state) {
63 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
64 return "ACTIVE_NO_DATA";
65 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
66 return "QUIESCENT_NO_DATA";
67 case LTTNG_LIVE_STREAM_QUIESCENT:
68 return "QUIESCENT";
69 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
70 return "ACTIVE_DATA";
71 case LTTNG_LIVE_STREAM_EOF:
72 return "EOF";
73 default:
74 return "ERROR";
75 }
76}
7cdc2bab
MD
77
78#define print_stream_state(stream) \
79 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
80 bt_port_get_name(bt_port_from_private_port(stream->port)), \
81 print_state(stream), stream->last_returned_inactivity_timestamp, \
82 stream->current_inactivity_timestamp)
83
d3e4dcd8 84BT_HIDDEN
087bc060 85int bt_lttng_live_log_level = BT_LOG_NONE;
7cdc2bab
MD
86
87BT_HIDDEN
88int lttng_live_add_port(struct lttng_live_component *lttng_live,
89 struct lttng_live_stream_iterator *stream_iter)
90{
91 int ret;
92 struct bt_private_port *private_port;
93 char name[STREAM_NAME_MAX_LEN];
94
95 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
96 assert(ret > 0);
97 strcpy(stream_iter->name, name);
98 private_port = bt_private_component_source_add_output_private_port(
99 lttng_live->private_component, name, stream_iter);
100 if (!private_port) {
101 return -1;
102 }
087bc060 103 BT_LOGI("Added port %s", name);
7cdc2bab
MD
104
105 if (lttng_live->no_stream_port) {
106 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
107 if (ret) {
108 return -1;
109 }
110 BT_PUT(lttng_live->no_stream_port);
111 lttng_live->no_stream_iter->port = NULL;
112 }
113 stream_iter->port = private_port;
114 return 0;
115}
116
117BT_HIDDEN
118int lttng_live_remove_port(struct lttng_live_component *lttng_live,
119 struct bt_private_port *port)
120{
121 struct bt_component *component;
122 int64_t nr_ports;
123 int ret;
124
125 component = bt_component_from_private_component(lttng_live->private_component);
126 nr_ports = bt_component_source_get_output_port_count(component);
127 if (nr_ports < 0) {
128 return -1;
129 }
130 BT_PUT(component);
131 if (nr_ports == 1) {
132 assert(!lttng_live->no_stream_port);
133 lttng_live->no_stream_port =
134 bt_private_component_source_add_output_private_port(lttng_live->private_component,
135 "no-stream", lttng_live->no_stream_iter);
136 if (!lttng_live->no_stream_port) {
137 return -1;
138 }
139 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
140 }
141 ret = bt_private_port_remove_from_component(port);
142 if (ret) {
143 return -1;
144 }
145 return 0;
146}
147
148static
149struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
150 uint64_t trace_id)
d3e4dcd8 151{
7cdc2bab
MD
152 struct lttng_live_trace *trace;
153
154 bt_list_for_each_entry(trace, &session->traces, node) {
155 if (trace->id == trace_id) {
156 return trace;
157 }
158 }
d3eb6e8f
PP
159 return NULL;
160}
161
7cdc2bab
MD
162static
163void lttng_live_destroy_trace(struct bt_object *obj)
164{
165 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
166
087bc060 167 BT_LOGI("Destroy trace");
7cdc2bab
MD
168 assert(bt_list_empty(&trace->streams));
169 bt_list_del(&trace->node);
170 lttng_live_metadata_fini(trace);
171 BT_PUT(trace->cc_prio_map);
172 g_free(trace);
173}
174
175static
176struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
177 uint64_t trace_id)
178{
179 struct lttng_live_trace *trace = NULL;
180
181 trace = g_new0(struct lttng_live_trace, 1);
182 if (!trace) {
183 goto error;
184 }
185 trace->session = session;
186 trace->id = trace_id;
187 BT_INIT_LIST_HEAD(&trace->streams);
188 trace->new_metadata_needed = true;
189 bt_list_add(&trace->node, &session->traces);
190 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 191 BT_LOGI("Create trace");
7cdc2bab
MD
192 goto end;
193error:
194 g_free(trace);
195 trace = NULL;
196end:
197 return trace;
198}
199
200BT_HIDDEN
201struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
202 uint64_t trace_id)
203{
204 struct lttng_live_trace *trace;
205
206 trace = lttng_live_find_trace(session, trace_id);
207 if (trace) {
208 bt_get(trace);
209 return trace;
210 }
211 return lttng_live_create_trace(session, trace_id);
212}
213
214BT_HIDDEN
215void lttng_live_unref_trace(struct lttng_live_trace *trace)
216{
217 bt_put(trace);
218}
219
220static
221void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
222{
223 struct lttng_live_stream_iterator *stream, *s;
224
225 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
226 lttng_live_stream_iterator_destroy(stream);
227 }
228 lttng_live_metadata_fini(trace);
229}
230
231BT_HIDDEN
232int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
233{
234 int ret = 0;
235 struct lttng_live_session *s;
236
237 s = g_new0(struct lttng_live_session, 1);
238 if (!s) {
239 goto error;
240 }
241
242 s->id = session_id;
243 BT_INIT_LIST_HEAD(&s->traces);
244 s->lttng_live = lttng_live;
245 s->new_streams_needed = true;
246
087bc060 247 BT_LOGI("Reading from session %" PRIu64, s->id);
7cdc2bab
MD
248 bt_list_add(&s->node, &lttng_live->sessions);
249 goto end;
250error:
087bc060 251 BT_LOGE("Error adding session");
7cdc2bab
MD
252 g_free(s);
253 ret = -1;
254end:
255 return ret;
256}
257
258static
259void lttng_live_destroy_session(struct lttng_live_session *session)
260{
261 struct lttng_live_trace *trace, *t;
262
087bc060 263 BT_LOGI("Destroy session");
7cdc2bab
MD
264 if (session->id != -1ULL) {
265 if (lttng_live_detach_session(session)) {
266 /* Old relayd cannot detach sessions. */
087bc060 267 BT_LOGD("Unable to detach session %" PRIu64,
7cdc2bab
MD
268 session->id);
269 }
270 session->id = -1ULL;
271 }
272 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
273 lttng_live_close_trace_streams(trace);
274 }
275 bt_list_del(&session->node);
276 g_free(session);
277}
278
279BT_HIDDEN
280void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
281{
282 struct lttng_live_stream_iterator_generic *s =
283 bt_private_notification_iterator_get_user_data(it);
284
285 switch (s->type) {
286 case LIVE_STREAM_TYPE_NO_STREAM:
287 {
288 /* Leave no_stream_iter in place when port is removed. */
289 break;
290 }
291 case LIVE_STREAM_TYPE_STREAM:
292 {
293 struct lttng_live_stream_iterator *stream_iter =
294 container_of(s, struct lttng_live_stream_iterator, p);
295
296 lttng_live_stream_iterator_destroy(stream_iter);
297 break;
298 }
299 }
300}
301
302static
303enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
304 struct lttng_live_component *lttng_live,
305 struct lttng_live_stream_iterator *lttng_live_stream)
306{
307 switch (lttng_live_stream->state) {
308 case LTTNG_LIVE_STREAM_QUIESCENT:
309 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
310 break;
311 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
312 /* Invalid state. */
087bc060
MD
313 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
314 abort();
7cdc2bab
MD
315 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
316 /* Invalid state. */
087bc060
MD
317 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
318 abort();
7cdc2bab
MD
319 case LTTNG_LIVE_STREAM_EOF:
320 break;
321 }
322 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
323}
324
325/*
326 * For active no data stream, fetch next data. It can be either:
327 * - quiescent: need to put it in the prio heap at quiescent end
328 * timestamp,
329 * - have data: need to wire up first event into the prio heap,
330 * - have no data on this stream at this point: need to retry (AGAIN) or
331 * return EOF.
332 */
333static
334enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
335 struct lttng_live_component *lttng_live,
336 struct lttng_live_stream_iterator *lttng_live_stream)
337{
338 enum bt_ctf_lttng_live_iterator_status ret =
339 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
340 struct packet_index index;
341 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
342
343 if (lttng_live_stream->trace->new_metadata_needed) {
344 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
345 goto end;
346 }
347 if (lttng_live_stream->trace->session->new_streams_needed) {
348 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
349 goto end;
350 }
351 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
352 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
353 goto end;
354 }
355 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
356 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
357 goto end;
358 }
359 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
360 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
361 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
362 && lttng_live_stream->last_returned_inactivity_timestamp ==
363 lttng_live_stream->current_inactivity_timestamp) {
364 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
365 print_stream_state(lttng_live_stream);
366 } else {
367 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
368 }
369 goto end;
370 }
371 lttng_live_stream->base_offset = index.offset;
372 lttng_live_stream->offset = index.offset;
373 lttng_live_stream->len = index.packet_size / CHAR_BIT;
374end:
375 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
376 ret = lttng_live_iterator_next_check_stream_state(
377 lttng_live, lttng_live_stream);
378 }
379 return ret;
380}
381
382/*
383 * Creation of the notification requires the ctf trace to be created
384 * beforehand, but the live protocol gives us all streams (including
385 * metadata) at once. So we split it in three steps: getting streams,
386 * getting metadata (which creates the ctf trace), and then creating the
387 * per-stream notifications.
388 */
389static
390enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
391 struct lttng_live_component *lttng_live,
392 struct lttng_live_session *session)
393{
394 enum bt_ctf_lttng_live_iterator_status status;
395 struct lttng_live_trace *trace, *t;
396
397 if (lttng_live_attach_session(session)) {
398 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
399 }
400 status = lttng_live_get_new_streams(session);
401 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
402 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
403 return status;
404 }
405 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
406 status = lttng_live_metadata_update(trace);
407 if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
408 int retval;
409
410 retval = bt_ctf_trace_set_is_static(trace->trace);
411 assert(!retval);
412 } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
413 return status;
414 }
415 }
416 return lttng_live_lazy_notif_init(session);
417}
418
419BT_HIDDEN
420void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
421{
422 struct lttng_live_session *session;
423
424 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
425 session->new_streams_needed = true;
426 }
427}
428
429static
430void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
431{
432 struct lttng_live_session *session;
433
434 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
435 struct lttng_live_trace *trace;
436
437 session->new_streams_needed = true;
438 bt_list_for_each_entry(trace, &session->traces, node) {
439 trace->new_metadata_needed = true;
440 }
441 }
442}
443
444static
445enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
446 struct lttng_live_component *lttng_live)
447{
448 enum bt_ctf_lttng_live_iterator_status ret =
449 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
450 unsigned int nr_sessions_opened = 0;
451 struct lttng_live_session *session, *s;
452
453 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
454 if (session->closed && bt_list_empty(&session->traces)) {
455 lttng_live_destroy_session(session);
456 }
457 }
458 /*
459 * Currently, when there are no sessions, we quit immediately.
460 * We may want to add a component parameter to keep trying until
461 * we get data in the future.
462 * Also, in a remotely distant future, we could add a "new
463 * session" flag to the protocol, which would tell us that we
464 * need to query for new sessions even though we have sessions
465 * currently ongoing.
466 */
467 if (bt_list_empty(&lttng_live->sessions)) {
468 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
469 goto end;
470 }
471 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
472 ret = lttng_live_get_session(lttng_live, session);
473 switch (ret) {
474 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
475 break;
476 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
477 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
478 break;
479 default:
480 goto end;
481 }
482 if (!session->closed) {
483 nr_sessions_opened++;
484 }
485 }
486end:
487 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
488 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
489 }
490 return ret;
491}
492
493static
494enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
495 struct lttng_live_component *lttng_live,
496 struct lttng_live_stream_iterator *lttng_live_stream,
497 struct bt_notification **notification,
498 uint64_t timestamp)
499{
500 enum bt_ctf_lttng_live_iterator_status ret =
501 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
502 struct lttng_live_trace *trace;
503 struct bt_ctf_clock_class *clock_class = NULL;
504 struct bt_ctf_clock_value *clock_value = NULL;
505 struct bt_notification *notif = NULL;
506 int retval;
507
508 trace = lttng_live_stream->trace;
509 if (!trace) {
510 goto error;
511 }
512 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
513 if (!clock_class) {
514 goto error;
515 }
516 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
517 if (!clock_value) {
518 goto error;
519 }
520 notif = bt_notification_inactivity_create(trace->cc_prio_map);
521 if (!notif) {
522 goto error;
523 }
524 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
525 if (retval) {
526 goto error;
527 }
528 *notification = notif;
529end:
530 bt_put(clock_value);
531 bt_put(clock_class);
532 return ret;
533
534error:
535 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
536 bt_put(notif);
537 goto end;
538}
539
540static
541enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
542 struct lttng_live_component *lttng_live,
543 struct lttng_live_stream_iterator *lttng_live_stream,
544 struct bt_notification **notification)
545{
546 enum bt_ctf_lttng_live_iterator_status ret =
547 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
548 struct bt_ctf_clock_class *clock_class = NULL;
549 struct bt_ctf_clock_value *clock_value = NULL;
550
551 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
552 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
553 }
554
555 if (lttng_live_stream->current_inactivity_timestamp ==
556 lttng_live_stream->last_returned_inactivity_timestamp) {
557 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
558 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
559 goto end;
560 }
561
562 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
563 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
564
565 lttng_live_stream->last_returned_inactivity_timestamp =
566 lttng_live_stream->current_inactivity_timestamp;
567end:
568 bt_put(clock_value);
569 bt_put(clock_class);
570 return ret;
571}
572
573static
574enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
575 struct lttng_live_component *lttng_live,
576 struct lttng_live_stream_iterator *lttng_live_stream,
577 struct bt_notification **notification)
578{
579 enum bt_ctf_lttng_live_iterator_status ret =
580 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
581 enum bt_ctf_notif_iter_status status;
582 struct lttng_live_session *session;
583
584 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
585 struct lttng_live_trace *trace;
586
587 if (session->new_streams_needed) {
588 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
589 }
590 bt_list_for_each_entry(trace, &session->traces, node) {
591 if (trace->new_metadata_needed) {
592 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
593 }
594 }
595 }
596
597 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
598 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
599 }
600 if (lttng_live_stream->packet_end_notif_queue) {
601 *notification = lttng_live_stream->packet_end_notif_queue;
602 lttng_live_stream->packet_end_notif_queue = NULL;
603 status = BT_CTF_NOTIF_ITER_STATUS_OK;
604 } else {
605 status = bt_ctf_notif_iter_get_next_notification(
606 lttng_live_stream->notif_iter,
607 lttng_live_stream->trace->cc_prio_map,
608 notification);
609 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
610 /*
611 * Consider empty packets as inactivity.
612 */
613 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
614 lttng_live_stream->packet_end_notif_queue = *notification;
615 *notification = NULL;
616 return emit_inactivity_notification(lttng_live,
617 lttng_live_stream, notification,
618 lttng_live_stream->current_packet_end_timestamp);
619 }
620 }
621 }
622 switch (status) {
623 case BT_CTF_NOTIF_ITER_STATUS_EOF:
624 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
625 break;
626 case BT_CTF_NOTIF_ITER_STATUS_OK:
627 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
628 break;
629 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
630 /*
631 * Continue immediately (end of packet). The next
632 * get_index may return AGAIN to delay the following
633 * attempt.
634 */
635 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
636 break;
637 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
638 /* No argument provided by the user, so don't return INVAL. */
639 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
640 default:
641 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
642 break;
643 }
644 return ret;
645}
646
647/*
648 * helper function:
649 * handle_no_data_streams()
650 * retry:
651 * - for each ACTIVE_NO_DATA stream:
652 * - query relayd for stream data, or quiescence info.
653 * - if need metadata, get metadata, goto retry.
654 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
655 * - if quiescent, move to QUIESCENT streams
656 * - if fetched data, move to ACTIVE_DATA streams
657 * (at this point each stream either has data, or is quiescent)
658 *
659 *
660 * iterator_next:
661 * handle_new_streams_and_metadata()
662 * - query relayd for known streams, add them as ACTIVE_NO_DATA
663 * - query relayd for metadata
664 *
665 * call handle_active_no_data_streams()
666 *
667 * handle_quiescent_streams()
668 * - if at least one stream is ACTIVE_DATA:
669 * - peek stream event with lowest timestamp -> next_ts
670 * - for each quiescent stream
671 * - if next_ts >= quiescent end
672 * - set state to ACTIVE_NO_DATA
673 * - else
674 * - for each quiescent stream
675 * - set state to ACTIVE_NO_DATA
676 *
677 * call handle_active_no_data_streams()
678 *
679 * handle_active_data_streams()
680 * - if at least one stream is ACTIVE_DATA:
681 * - get stream event with lowest timestamp from heap
682 * - make that stream event the current notification.
683 * - move this stream heap position to its next event
684 * - if we need to fetch data from relayd, move
685 * stream to ACTIVE_NO_DATA.
686 * - return OK
687 * - return AGAIN
688 *
689 * end criterion: ctrl-c on client. If relayd exits or the session
690 * closes on the relay daemon side, we keep on waiting for streams.
691 * Eventually handle --end timestamp (also an end criterion).
692 *
693 * When disconnected from relayd: try to re-connect endlessly.
694 */
695static
696struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
697 struct bt_private_notification_iterator *iterator,
698 struct lttng_live_stream_iterator *stream_iter)
699{
700 enum bt_ctf_lttng_live_iterator_status status;
701 struct bt_notification_iterator_next_return next_return;
702 struct lttng_live_component *lttng_live;
703
704 lttng_live = stream_iter->trace->session->lttng_live;
705retry:
706 print_stream_state(stream_iter);
707 next_return.notification = NULL;
708 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
709 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
710 goto end;
711 }
712 status = lttng_live_iterator_next_handle_one_no_data_stream(
713 lttng_live, stream_iter);
714 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
715 goto end;
716 }
717 status = lttng_live_iterator_next_handle_one_quiescent_stream(
718 lttng_live, stream_iter, &next_return.notification);
719 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
720 assert(next_return.notification == NULL);
721 goto end;
722 }
723 if (next_return.notification) {
724 goto end;
725 }
726 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
727 stream_iter, &next_return.notification);
728 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
729 assert(next_return.notification == NULL);
730 }
731
732end:
733 switch (status) {
734 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
735 print_dbg("continue");
736 goto retry;
737 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
738 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
739 print_dbg("again");
740 break;
741 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
742 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
743 print_dbg("end");
744 break;
745 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
746 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
747 print_dbg("ok");
748 break;
749 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
750 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
751 break;
752 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
753 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
754 break;
755 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
756 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
757 break;
758 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
759 default: /* fall-through */
760 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
761 break;
762 }
763 return next_return;
764}
765
766static
767struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
768 struct bt_private_notification_iterator *iterator,
769 struct lttng_live_no_stream_iterator *no_stream_iter)
770{
771 enum bt_ctf_lttng_live_iterator_status status;
772 struct bt_notification_iterator_next_return next_return;
773 struct lttng_live_component *lttng_live;
774
775 lttng_live = no_stream_iter->lttng_live;
776retry:
777 lttng_live_force_new_streams_and_metadata(lttng_live);
778 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
779 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
780 goto end;
781 }
782 if (no_stream_iter->port) {
783 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
784 } else {
785 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
786 }
787end:
788 switch (status) {
789 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
790 goto retry;
791 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
792 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
793 break;
794 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
795 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
796 break;
797 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
798 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
799 break;
800 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
801 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
802 break;
803 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
805 break;
806 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
807 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
808 break;
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
810 default: /* fall-through */
811 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
812 break;
813 }
814 return next_return;
815}
816
d3eb6e8f 817BT_HIDDEN
41a2b7ae 818struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 819 struct bt_private_notification_iterator *iterator)
d3eb6e8f 820{
7cdc2bab
MD
821 struct lttng_live_stream_iterator_generic *s =
822 bt_private_notification_iterator_get_user_data(iterator);
823 struct bt_notification_iterator_next_return next_return;
824
825 switch (s->type) {
826 case LIVE_STREAM_TYPE_NO_STREAM:
827 next_return = lttng_live_iterator_next_no_stream(iterator,
828 container_of(s, struct lttng_live_no_stream_iterator, p));
829 break;
830 case LIVE_STREAM_TYPE_STREAM:
831 next_return = lttng_live_iterator_next_stream(iterator,
832 container_of(s, struct lttng_live_stream_iterator, p));
833 break;
834 default:
835 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
836 break;
837 }
838 return next_return;
839}
41a2b7ae 840
7cdc2bab
MD
841BT_HIDDEN
842enum bt_notification_iterator_status lttng_live_iterator_init(
843 struct bt_private_notification_iterator *it,
844 struct bt_private_port *port)
845{
846 enum bt_notification_iterator_status ret =
847 BT_NOTIFICATION_ITERATOR_STATUS_OK;
848 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
849
850 assert(it);
851
852 s = bt_private_port_get_user_data(port);
853 assert(s);
854 switch (s->type) {
855 case LIVE_STREAM_TYPE_NO_STREAM:
856 {
857 struct lttng_live_no_stream_iterator *no_stream_iter =
858 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
859 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
860 if (ret) {
861 goto error;
862 }
863 break;
864 }
865 case LIVE_STREAM_TYPE_STREAM:
866 {
867 struct lttng_live_stream_iterator *stream_iter =
868 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
869 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
870 if (ret) {
871 goto error;
872 }
873 break;
874 }
875 default:
876 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
877 goto end;
878 }
879
880end:
41a2b7ae 881 return ret;
7cdc2bab
MD
882error:
883 if (bt_private_notification_iterator_set_user_data(it, NULL)
884 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 885 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
886 }
887 goto end;
888}
889
890static
891struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
892 struct bt_value *params)
893{
894 struct bt_value *url_value = NULL;
895 struct bt_value *results = NULL;
896 const char *url;
897 struct bt_live_viewer_connection *viewer_connection = NULL;
898 enum bt_value_status ret;
899
900 url_value = bt_value_map_get(params, "url");
901 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 902 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
903 goto error;
904 }
905
906 ret = bt_value_string_get(url_value, &url);
907 if (ret != BT_VALUE_STATUS_OK) {
087bc060 908 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
909 goto error;
910 }
911
912 viewer_connection = bt_live_viewer_connection_create(url, stderr);
913 if (!viewer_connection) {
914 ret = BT_COMPONENT_STATUS_NOMEM;
915 goto error;
916 }
917
918 results = bt_live_viewer_connection_list_sessions(viewer_connection);
919 goto end;
920error:
921 BT_PUT(results);
922end:
923 if (viewer_connection) {
924 bt_live_viewer_connection_destroy(viewer_connection);
925 }
926 BT_PUT(url_value);
927 return results;
928}
929
930BT_HIDDEN
931struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
932 const char *object, struct bt_value *params)
933{
934 if (strcmp(object, "sessions") == 0) {
935 return lttng_live_query_list_sessions(comp_class,
936 params);
937 }
087bc060 938 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
939 return NULL;
940}
941
942static
943void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
944{
945 int ret;
946 struct lttng_live_session *session, *s;
947
948 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
949 lttng_live_destroy_session(session);
950 }
951 BT_PUT(lttng_live->viewer_connection);
952 if (lttng_live->url) {
953 g_string_free(lttng_live->url, TRUE);
954 }
955 if (lttng_live->no_stream_port) {
956 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
957 assert(!ret);
958 BT_PUT(lttng_live->no_stream_port);
959 }
960 if (lttng_live->no_stream_iter) {
961 g_free(lttng_live->no_stream_iter);
962 }
963 g_free(lttng_live);
964}
965
966BT_HIDDEN
967void lttng_live_component_finalize(struct bt_private_component *component)
968{
969 void *data = bt_private_component_get_user_data(component);
970
971 if (!data) {
972 return;
973 }
974 lttng_live_component_destroy_data(data);
975}
976
977static
978struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
979 struct bt_private_component *private_component)
980{
981 struct lttng_live_component *lttng_live;
982 struct bt_value *value = NULL;
983 const char *url;
984 enum bt_value_status ret;
985
986 lttng_live = g_new0(struct lttng_live_component, 1);
987 if (!lttng_live) {
988 goto end;
989 }
7cdc2bab
MD
990 /* TODO: make this an overridable parameter. */
991 lttng_live->max_query_size = MAX_QUERY_SIZE;
992 BT_INIT_LIST_HEAD(&lttng_live->sessions);
993 value = bt_value_map_get(params, "url");
994 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 995 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
996 goto error;
997 }
998 ret = bt_value_string_get(value, &url);
999 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1000 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1001 goto error;
1002 }
1003 lttng_live->url = g_string_new(url);
1004 if (!lttng_live->url) {
1005 goto error;
1006 }
1007 lttng_live->viewer_connection =
1008 bt_live_viewer_connection_create(lttng_live->url->str,
1009 stderr);
1010 if (!lttng_live->viewer_connection) {
1011 ret = BT_COMPONENT_STATUS_NOMEM;
1012 goto error;
1013 }
1014 if (lttng_live_create_viewer_session(lttng_live)) {
1015 ret = BT_COMPONENT_STATUS_ERROR;
1016 goto error;
1017 }
1018 lttng_live->private_component = private_component;
1019
1020 goto end;
1021
1022error:
1023 lttng_live_component_destroy_data(lttng_live);
1024 lttng_live = NULL;
1025end:
1026 return lttng_live;
d3e4dcd8
PP
1027}
1028
f3bc2010 1029BT_HIDDEN
7cdc2bab
MD
1030enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
1031 struct bt_value *params, void *init_method_data)
f3bc2010 1032{
7cdc2bab
MD
1033 struct lttng_live_component *lttng_live;
1034 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1035
7cdc2bab
MD
1036 /* Passes ownership of iter ref to lttng_live_component_create. */
1037 lttng_live = lttng_live_component_create(params, component);
1038 if (!lttng_live) {
1039 ret = BT_COMPONENT_STATUS_NOMEM;
1040 goto end;
1041 }
1042
1043 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1044 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1045 lttng_live->no_stream_iter->lttng_live = lttng_live;
1046
1047 lttng_live->no_stream_port =
1048 bt_private_component_source_add_output_private_port(
1049 lttng_live->private_component, "no-stream",
1050 lttng_live->no_stream_iter);
1051 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1052
1053 ret = bt_private_component_set_user_data(component, lttng_live);
1054 if (ret != BT_COMPONENT_STATUS_OK) {
1055 goto error;
1056 }
1057
1058end:
1059 return ret;
1060error:
1061 (void) bt_private_component_set_user_data(component, NULL);
1062 lttng_live_component_destroy_data(lttng_live);
1063 return ret;
f3bc2010 1064}
087bc060
MD
1065
1066static
1067void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
1068{
1069 enum bt_logging_level log_level = BT_LOG_NONE;
1070 const char *log_level_env = getenv("BABELTRACE_PLUGIN_LTTNG_LIVE_LOG_LEVEL");
1071
1072 if (!log_level_env) {
1073 return;
1074 }
1075
1076 if (strcmp(log_level_env, "VERBOSE") == 0) {
1077 log_level = BT_LOGGING_LEVEL_VERBOSE;
1078 } else if (strcmp(log_level_env, "DEBUG") == 0) {
1079 log_level = BT_LOGGING_LEVEL_DEBUG;
1080 } else if (strcmp(log_level_env, "INFO") == 0) {
1081 log_level = BT_LOGGING_LEVEL_INFO;
1082 } else if (strcmp(log_level_env, "WARN") == 0) {
1083 log_level = BT_LOGGING_LEVEL_WARN;
1084 } else if (strcmp(log_level_env, "ERROR") == 0) {
1085 log_level = BT_LOGGING_LEVEL_ERROR;
1086 } else if (strcmp(log_level_env, "FATAL") == 0) {
1087 log_level = BT_LOGGING_LEVEL_FATAL;
1088 }
1089
1090 bt_lttng_live_log_level = log_level;
1091}
This page took 0.078877 seconds and 4 git commands to generate.