Fix: invoke bt_ctf_trace_set_is_static only when trace is destroyed
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
7cdc2bab 30#include <babeltrace/ctf-ir/packet.h>
b2e0c907 31#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
32#include <babeltrace/graph/private-port.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component.h>
35#include <babeltrace/graph/private-component-source.h>
36#include <babeltrace/graph/private-notification-iterator.h>
37#include <babeltrace/graph/notification-stream.h>
38#include <babeltrace/graph/notification-packet.h>
39#include <babeltrace/graph/notification-event.h>
40#include <babeltrace/graph/notification-heap.h>
41#include <babeltrace/graph/notification-iterator.h>
42#include <babeltrace/graph/notification-inactivity.h>
43#include <babeltrace/compiler-internal.h>
44#include <inttypes.h>
45#include <glib.h>
46#include <assert.h>
47#include <unistd.h>
7d61fa8e 48#include <plugins-common.h>
f3bc2010 49
087bc060 50#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
047358f9 51#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
087bc060 52
7cdc2bab
MD
53#include "data-stream.h"
54#include "metadata.h"
0f5e83e5 55#include "lttng-live-internal.h"
7cdc2bab 56
7cdc2bab 57#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 58
087bc060 59#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
60
61static const char *print_state(struct lttng_live_stream_iterator *s)
62{
63 switch (s->state) {
64 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
65 return "ACTIVE_NO_DATA";
66 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
67 return "QUIESCENT_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT:
69 return "QUIESCENT";
70 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
71 return "ACTIVE_DATA";
72 case LTTNG_LIVE_STREAM_EOF:
73 return "EOF";
74 default:
75 return "ERROR";
76 }
77}
7cdc2bab
MD
78
79#define print_stream_state(stream) \
80 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
81 bt_port_get_name(bt_port_from_private_port(stream->port)), \
82 print_state(stream), stream->last_returned_inactivity_timestamp, \
83 stream->current_inactivity_timestamp)
84
d3e4dcd8 85BT_HIDDEN
087bc060 86int bt_lttng_live_log_level = BT_LOG_NONE;
7cdc2bab
MD
87
88BT_HIDDEN
89int lttng_live_add_port(struct lttng_live_component *lttng_live,
90 struct lttng_live_stream_iterator *stream_iter)
91{
92 int ret;
93 struct bt_private_port *private_port;
94 char name[STREAM_NAME_MAX_LEN];
95
96 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
97 assert(ret > 0);
98 strcpy(stream_iter->name, name);
99 private_port = bt_private_component_source_add_output_private_port(
100 lttng_live->private_component, name, stream_iter);
101 if (!private_port) {
102 return -1;
103 }
087bc060 104 BT_LOGI("Added port %s", name);
7cdc2bab
MD
105
106 if (lttng_live->no_stream_port) {
107 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
108 if (ret) {
109 return -1;
110 }
111 BT_PUT(lttng_live->no_stream_port);
112 lttng_live->no_stream_iter->port = NULL;
113 }
114 stream_iter->port = private_port;
115 return 0;
116}
117
118BT_HIDDEN
119int lttng_live_remove_port(struct lttng_live_component *lttng_live,
120 struct bt_private_port *port)
121{
122 struct bt_component *component;
123 int64_t nr_ports;
124 int ret;
125
126 component = bt_component_from_private_component(lttng_live->private_component);
127 nr_ports = bt_component_source_get_output_port_count(component);
128 if (nr_ports < 0) {
129 return -1;
130 }
131 BT_PUT(component);
132 if (nr_ports == 1) {
133 assert(!lttng_live->no_stream_port);
134 lttng_live->no_stream_port =
135 bt_private_component_source_add_output_private_port(lttng_live->private_component,
136 "no-stream", lttng_live->no_stream_iter);
137 if (!lttng_live->no_stream_port) {
138 return -1;
139 }
140 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
141 }
142 ret = bt_private_port_remove_from_component(port);
143 if (ret) {
144 return -1;
145 }
146 return 0;
147}
148
149static
150struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
151 uint64_t trace_id)
d3e4dcd8 152{
7cdc2bab
MD
153 struct lttng_live_trace *trace;
154
155 bt_list_for_each_entry(trace, &session->traces, node) {
156 if (trace->id == trace_id) {
157 return trace;
158 }
159 }
d3eb6e8f
PP
160 return NULL;
161}
162
7cdc2bab
MD
163static
164void lttng_live_destroy_trace(struct bt_object *obj)
165{
166 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
5bd230f4 167 int retval;
7cdc2bab 168
087bc060 169 BT_LOGI("Destroy trace");
7cdc2bab
MD
170 assert(bt_list_empty(&trace->streams));
171 bt_list_del(&trace->node);
5bd230f4
MD
172
173 retval = bt_ctf_trace_set_is_static(trace->trace);
174 assert(!retval);
175
7cdc2bab
MD
176 lttng_live_metadata_fini(trace);
177 BT_PUT(trace->cc_prio_map);
178 g_free(trace);
179}
180
181static
182struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
183 uint64_t trace_id)
184{
185 struct lttng_live_trace *trace = NULL;
186
187 trace = g_new0(struct lttng_live_trace, 1);
188 if (!trace) {
189 goto error;
190 }
191 trace->session = session;
192 trace->id = trace_id;
193 BT_INIT_LIST_HEAD(&trace->streams);
194 trace->new_metadata_needed = true;
195 bt_list_add(&trace->node, &session->traces);
196 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 197 BT_LOGI("Create trace");
7cdc2bab
MD
198 goto end;
199error:
200 g_free(trace);
201 trace = NULL;
202end:
203 return trace;
204}
205
206BT_HIDDEN
207struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
208 uint64_t trace_id)
209{
210 struct lttng_live_trace *trace;
211
212 trace = lttng_live_find_trace(session, trace_id);
213 if (trace) {
214 bt_get(trace);
215 return trace;
216 }
217 return lttng_live_create_trace(session, trace_id);
218}
219
220BT_HIDDEN
221void lttng_live_unref_trace(struct lttng_live_trace *trace)
222{
223 bt_put(trace);
224}
225
226static
227void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
228{
229 struct lttng_live_stream_iterator *stream, *s;
230
231 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
232 lttng_live_stream_iterator_destroy(stream);
233 }
234 lttng_live_metadata_fini(trace);
235}
236
237BT_HIDDEN
238int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
239{
240 int ret = 0;
241 struct lttng_live_session *s;
242
243 s = g_new0(struct lttng_live_session, 1);
244 if (!s) {
245 goto error;
246 }
247
248 s->id = session_id;
249 BT_INIT_LIST_HEAD(&s->traces);
250 s->lttng_live = lttng_live;
251 s->new_streams_needed = true;
252
087bc060 253 BT_LOGI("Reading from session %" PRIu64, s->id);
7cdc2bab
MD
254 bt_list_add(&s->node, &lttng_live->sessions);
255 goto end;
256error:
087bc060 257 BT_LOGE("Error adding session");
7cdc2bab
MD
258 g_free(s);
259 ret = -1;
260end:
261 return ret;
262}
263
264static
265void lttng_live_destroy_session(struct lttng_live_session *session)
266{
267 struct lttng_live_trace *trace, *t;
268
087bc060 269 BT_LOGI("Destroy session");
7cdc2bab
MD
270 if (session->id != -1ULL) {
271 if (lttng_live_detach_session(session)) {
272 /* Old relayd cannot detach sessions. */
087bc060 273 BT_LOGD("Unable to detach session %" PRIu64,
7cdc2bab
MD
274 session->id);
275 }
276 session->id = -1ULL;
277 }
278 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
279 lttng_live_close_trace_streams(trace);
280 }
281 bt_list_del(&session->node);
282 g_free(session);
283}
284
285BT_HIDDEN
286void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
287{
288 struct lttng_live_stream_iterator_generic *s =
289 bt_private_notification_iterator_get_user_data(it);
290
291 switch (s->type) {
292 case LIVE_STREAM_TYPE_NO_STREAM:
293 {
294 /* Leave no_stream_iter in place when port is removed. */
295 break;
296 }
297 case LIVE_STREAM_TYPE_STREAM:
298 {
299 struct lttng_live_stream_iterator *stream_iter =
300 container_of(s, struct lttng_live_stream_iterator, p);
301
302 lttng_live_stream_iterator_destroy(stream_iter);
303 break;
304 }
305 }
306}
307
308static
309enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
310 struct lttng_live_component *lttng_live,
311 struct lttng_live_stream_iterator *lttng_live_stream)
312{
313 switch (lttng_live_stream->state) {
314 case LTTNG_LIVE_STREAM_QUIESCENT:
315 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
316 break;
317 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
318 /* Invalid state. */
087bc060
MD
319 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
320 abort();
7cdc2bab
MD
321 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
322 /* Invalid state. */
087bc060
MD
323 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
324 abort();
7cdc2bab
MD
325 case LTTNG_LIVE_STREAM_EOF:
326 break;
327 }
328 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
329}
330
331/*
332 * For active no data stream, fetch next data. It can be either:
333 * - quiescent: need to put it in the prio heap at quiescent end
334 * timestamp,
335 * - have data: need to wire up first event into the prio heap,
336 * - have no data on this stream at this point: need to retry (AGAIN) or
337 * return EOF.
338 */
339static
340enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
341 struct lttng_live_component *lttng_live,
342 struct lttng_live_stream_iterator *lttng_live_stream)
343{
344 enum bt_ctf_lttng_live_iterator_status ret =
345 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
346 struct packet_index index;
347 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
348
349 if (lttng_live_stream->trace->new_metadata_needed) {
350 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
351 goto end;
352 }
353 if (lttng_live_stream->trace->session->new_streams_needed) {
354 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
355 goto end;
356 }
357 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
358 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
359 goto end;
360 }
361 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
362 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
363 goto end;
364 }
365 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
366 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
367 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
368 && lttng_live_stream->last_returned_inactivity_timestamp ==
369 lttng_live_stream->current_inactivity_timestamp) {
370 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
371 print_stream_state(lttng_live_stream);
372 } else {
373 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
374 }
375 goto end;
376 }
377 lttng_live_stream->base_offset = index.offset;
378 lttng_live_stream->offset = index.offset;
379 lttng_live_stream->len = index.packet_size / CHAR_BIT;
380end:
381 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
382 ret = lttng_live_iterator_next_check_stream_state(
383 lttng_live, lttng_live_stream);
384 }
385 return ret;
386}
387
388/*
389 * Creation of the notification requires the ctf trace to be created
390 * beforehand, but the live protocol gives us all streams (including
391 * metadata) at once. So we split it in three steps: getting streams,
392 * getting metadata (which creates the ctf trace), and then creating the
393 * per-stream notifications.
394 */
395static
396enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
397 struct lttng_live_component *lttng_live,
398 struct lttng_live_session *session)
399{
400 enum bt_ctf_lttng_live_iterator_status status;
401 struct lttng_live_trace *trace, *t;
402
403 if (lttng_live_attach_session(session)) {
404 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
405 }
406 status = lttng_live_get_new_streams(session);
407 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
408 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
409 return status;
410 }
411 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
412 status = lttng_live_metadata_update(trace);
5bd230f4
MD
413 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
414 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
415 return status;
416 }
417 }
418 return lttng_live_lazy_notif_init(session);
419}
420
421BT_HIDDEN
422void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
423{
424 struct lttng_live_session *session;
425
426 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
427 session->new_streams_needed = true;
428 }
429}
430
431static
432void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
433{
434 struct lttng_live_session *session;
435
436 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
437 struct lttng_live_trace *trace;
438
439 session->new_streams_needed = true;
440 bt_list_for_each_entry(trace, &session->traces, node) {
441 trace->new_metadata_needed = true;
442 }
443 }
444}
445
446static
447enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
448 struct lttng_live_component *lttng_live)
449{
450 enum bt_ctf_lttng_live_iterator_status ret =
451 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
452 unsigned int nr_sessions_opened = 0;
453 struct lttng_live_session *session, *s;
454
455 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
456 if (session->closed && bt_list_empty(&session->traces)) {
457 lttng_live_destroy_session(session);
458 }
459 }
460 /*
461 * Currently, when there are no sessions, we quit immediately.
462 * We may want to add a component parameter to keep trying until
463 * we get data in the future.
464 * Also, in a remotely distant future, we could add a "new
465 * session" flag to the protocol, which would tell us that we
466 * need to query for new sessions even though we have sessions
467 * currently ongoing.
468 */
469 if (bt_list_empty(&lttng_live->sessions)) {
470 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
471 goto end;
472 }
473 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
474 ret = lttng_live_get_session(lttng_live, session);
475 switch (ret) {
476 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
477 break;
478 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
479 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
480 break;
481 default:
482 goto end;
483 }
484 if (!session->closed) {
485 nr_sessions_opened++;
486 }
487 }
488end:
489 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
490 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
491 }
492 return ret;
493}
494
495static
496enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
497 struct lttng_live_component *lttng_live,
498 struct lttng_live_stream_iterator *lttng_live_stream,
499 struct bt_notification **notification,
500 uint64_t timestamp)
501{
502 enum bt_ctf_lttng_live_iterator_status ret =
503 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
504 struct lttng_live_trace *trace;
505 struct bt_ctf_clock_class *clock_class = NULL;
506 struct bt_ctf_clock_value *clock_value = NULL;
507 struct bt_notification *notif = NULL;
508 int retval;
509
510 trace = lttng_live_stream->trace;
511 if (!trace) {
512 goto error;
513 }
514 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
515 if (!clock_class) {
516 goto error;
517 }
518 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
519 if (!clock_value) {
520 goto error;
521 }
522 notif = bt_notification_inactivity_create(trace->cc_prio_map);
523 if (!notif) {
524 goto error;
525 }
526 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
527 if (retval) {
528 goto error;
529 }
530 *notification = notif;
531end:
532 bt_put(clock_value);
533 bt_put(clock_class);
534 return ret;
535
536error:
537 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
538 bt_put(notif);
539 goto end;
540}
541
542static
543enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
544 struct lttng_live_component *lttng_live,
545 struct lttng_live_stream_iterator *lttng_live_stream,
546 struct bt_notification **notification)
547{
548 enum bt_ctf_lttng_live_iterator_status ret =
549 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
550 struct bt_ctf_clock_class *clock_class = NULL;
551 struct bt_ctf_clock_value *clock_value = NULL;
552
553 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
554 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
555 }
556
557 if (lttng_live_stream->current_inactivity_timestamp ==
558 lttng_live_stream->last_returned_inactivity_timestamp) {
559 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
560 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
561 goto end;
562 }
563
564 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
565 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
566
567 lttng_live_stream->last_returned_inactivity_timestamp =
568 lttng_live_stream->current_inactivity_timestamp;
569end:
570 bt_put(clock_value);
571 bt_put(clock_class);
572 return ret;
573}
574
575static
576enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
577 struct lttng_live_component *lttng_live,
578 struct lttng_live_stream_iterator *lttng_live_stream,
579 struct bt_notification **notification)
580{
581 enum bt_ctf_lttng_live_iterator_status ret =
582 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
583 enum bt_ctf_notif_iter_status status;
584 struct lttng_live_session *session;
585
586 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
587 struct lttng_live_trace *trace;
588
589 if (session->new_streams_needed) {
590 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
591 }
592 bt_list_for_each_entry(trace, &session->traces, node) {
593 if (trace->new_metadata_needed) {
594 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
595 }
596 }
597 }
598
599 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
600 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
601 }
602 if (lttng_live_stream->packet_end_notif_queue) {
603 *notification = lttng_live_stream->packet_end_notif_queue;
604 lttng_live_stream->packet_end_notif_queue = NULL;
605 status = BT_CTF_NOTIF_ITER_STATUS_OK;
606 } else {
607 status = bt_ctf_notif_iter_get_next_notification(
608 lttng_live_stream->notif_iter,
609 lttng_live_stream->trace->cc_prio_map,
610 notification);
611 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
612 /*
613 * Consider empty packets as inactivity.
614 */
615 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
616 lttng_live_stream->packet_end_notif_queue = *notification;
617 *notification = NULL;
618 return emit_inactivity_notification(lttng_live,
619 lttng_live_stream, notification,
620 lttng_live_stream->current_packet_end_timestamp);
621 }
622 }
623 }
624 switch (status) {
625 case BT_CTF_NOTIF_ITER_STATUS_EOF:
626 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
627 break;
628 case BT_CTF_NOTIF_ITER_STATUS_OK:
629 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
630 break;
631 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
632 /*
633 * Continue immediately (end of packet). The next
634 * get_index may return AGAIN to delay the following
635 * attempt.
636 */
637 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
638 break;
639 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
640 /* No argument provided by the user, so don't return INVAL. */
641 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
642 default:
643 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
644 break;
645 }
646 return ret;
647}
648
649/*
650 * helper function:
651 * handle_no_data_streams()
652 * retry:
653 * - for each ACTIVE_NO_DATA stream:
654 * - query relayd for stream data, or quiescence info.
655 * - if need metadata, get metadata, goto retry.
656 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
657 * - if quiescent, move to QUIESCENT streams
658 * - if fetched data, move to ACTIVE_DATA streams
659 * (at this point each stream either has data, or is quiescent)
660 *
661 *
662 * iterator_next:
663 * handle_new_streams_and_metadata()
664 * - query relayd for known streams, add them as ACTIVE_NO_DATA
665 * - query relayd for metadata
666 *
667 * call handle_active_no_data_streams()
668 *
669 * handle_quiescent_streams()
670 * - if at least one stream is ACTIVE_DATA:
671 * - peek stream event with lowest timestamp -> next_ts
672 * - for each quiescent stream
673 * - if next_ts >= quiescent end
674 * - set state to ACTIVE_NO_DATA
675 * - else
676 * - for each quiescent stream
677 * - set state to ACTIVE_NO_DATA
678 *
679 * call handle_active_no_data_streams()
680 *
681 * handle_active_data_streams()
682 * - if at least one stream is ACTIVE_DATA:
683 * - get stream event with lowest timestamp from heap
684 * - make that stream event the current notification.
685 * - move this stream heap position to its next event
686 * - if we need to fetch data from relayd, move
687 * stream to ACTIVE_NO_DATA.
688 * - return OK
689 * - return AGAIN
690 *
691 * end criterion: ctrl-c on client. If relayd exits or the session
692 * closes on the relay daemon side, we keep on waiting for streams.
693 * Eventually handle --end timestamp (also an end criterion).
694 *
695 * When disconnected from relayd: try to re-connect endlessly.
696 */
697static
698struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
699 struct bt_private_notification_iterator *iterator,
700 struct lttng_live_stream_iterator *stream_iter)
701{
702 enum bt_ctf_lttng_live_iterator_status status;
703 struct bt_notification_iterator_next_return next_return;
704 struct lttng_live_component *lttng_live;
705
706 lttng_live = stream_iter->trace->session->lttng_live;
707retry:
708 print_stream_state(stream_iter);
709 next_return.notification = NULL;
710 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
711 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
712 goto end;
713 }
714 status = lttng_live_iterator_next_handle_one_no_data_stream(
715 lttng_live, stream_iter);
716 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
717 goto end;
718 }
719 status = lttng_live_iterator_next_handle_one_quiescent_stream(
720 lttng_live, stream_iter, &next_return.notification);
721 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
722 assert(next_return.notification == NULL);
723 goto end;
724 }
725 if (next_return.notification) {
726 goto end;
727 }
728 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
729 stream_iter, &next_return.notification);
730 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
731 assert(next_return.notification == NULL);
732 }
733
734end:
735 switch (status) {
736 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
737 print_dbg("continue");
738 goto retry;
739 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
740 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
741 print_dbg("again");
742 break;
743 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
744 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
745 print_dbg("end");
746 break;
747 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
748 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
749 print_dbg("ok");
750 break;
751 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
752 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
753 break;
754 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
755 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
756 break;
757 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
758 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
759 break;
760 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
761 default: /* fall-through */
762 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
763 break;
764 }
765 return next_return;
766}
767
768static
769struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
770 struct bt_private_notification_iterator *iterator,
771 struct lttng_live_no_stream_iterator *no_stream_iter)
772{
773 enum bt_ctf_lttng_live_iterator_status status;
774 struct bt_notification_iterator_next_return next_return;
775 struct lttng_live_component *lttng_live;
776
777 lttng_live = no_stream_iter->lttng_live;
778retry:
779 lttng_live_force_new_streams_and_metadata(lttng_live);
780 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
781 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
782 goto end;
783 }
784 if (no_stream_iter->port) {
785 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
786 } else {
787 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
788 }
789end:
790 switch (status) {
791 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
792 goto retry;
793 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
794 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
795 break;
796 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
797 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
798 break;
799 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
800 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
801 break;
802 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
803 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
804 break;
805 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
806 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
807 break;
808 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
809 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
810 break;
811 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
812 default: /* fall-through */
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
814 break;
815 }
816 return next_return;
817}
818
d3eb6e8f 819BT_HIDDEN
41a2b7ae 820struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 821 struct bt_private_notification_iterator *iterator)
d3eb6e8f 822{
7cdc2bab
MD
823 struct lttng_live_stream_iterator_generic *s =
824 bt_private_notification_iterator_get_user_data(iterator);
825 struct bt_notification_iterator_next_return next_return;
826
827 switch (s->type) {
828 case LIVE_STREAM_TYPE_NO_STREAM:
829 next_return = lttng_live_iterator_next_no_stream(iterator,
830 container_of(s, struct lttng_live_no_stream_iterator, p));
831 break;
832 case LIVE_STREAM_TYPE_STREAM:
833 next_return = lttng_live_iterator_next_stream(iterator,
834 container_of(s, struct lttng_live_stream_iterator, p));
835 break;
836 default:
837 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
838 break;
839 }
840 return next_return;
841}
41a2b7ae 842
7cdc2bab
MD
843BT_HIDDEN
844enum bt_notification_iterator_status lttng_live_iterator_init(
845 struct bt_private_notification_iterator *it,
846 struct bt_private_port *port)
847{
848 enum bt_notification_iterator_status ret =
849 BT_NOTIFICATION_ITERATOR_STATUS_OK;
850 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
851
852 assert(it);
853
854 s = bt_private_port_get_user_data(port);
855 assert(s);
856 switch (s->type) {
857 case LIVE_STREAM_TYPE_NO_STREAM:
858 {
859 struct lttng_live_no_stream_iterator *no_stream_iter =
860 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
861 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
862 if (ret) {
863 goto error;
864 }
865 break;
866 }
867 case LIVE_STREAM_TYPE_STREAM:
868 {
869 struct lttng_live_stream_iterator *stream_iter =
870 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
871 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
872 if (ret) {
873 goto error;
874 }
875 break;
876 }
877 default:
878 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
879 goto end;
880 }
881
882end:
41a2b7ae 883 return ret;
7cdc2bab
MD
884error:
885 if (bt_private_notification_iterator_set_user_data(it, NULL)
886 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 887 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
888 }
889 goto end;
890}
891
892static
893struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
894 struct bt_value *params)
895{
896 struct bt_value *url_value = NULL;
897 struct bt_value *results = NULL;
898 const char *url;
899 struct bt_live_viewer_connection *viewer_connection = NULL;
900 enum bt_value_status ret;
901
902 url_value = bt_value_map_get(params, "url");
903 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 904 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
905 goto error;
906 }
907
908 ret = bt_value_string_get(url_value, &url);
909 if (ret != BT_VALUE_STATUS_OK) {
087bc060 910 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
911 goto error;
912 }
913
914 viewer_connection = bt_live_viewer_connection_create(url, stderr);
915 if (!viewer_connection) {
916 ret = BT_COMPONENT_STATUS_NOMEM;
917 goto error;
918 }
919
920 results = bt_live_viewer_connection_list_sessions(viewer_connection);
921 goto end;
922error:
923 BT_PUT(results);
924end:
925 if (viewer_connection) {
926 bt_live_viewer_connection_destroy(viewer_connection);
927 }
928 BT_PUT(url_value);
929 return results;
930}
931
932BT_HIDDEN
933struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
934 const char *object, struct bt_value *params)
935{
936 if (strcmp(object, "sessions") == 0) {
937 return lttng_live_query_list_sessions(comp_class,
938 params);
939 }
087bc060 940 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
941 return NULL;
942}
943
944static
945void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
946{
947 int ret;
948 struct lttng_live_session *session, *s;
949
950 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
951 lttng_live_destroy_session(session);
952 }
953 BT_PUT(lttng_live->viewer_connection);
954 if (lttng_live->url) {
955 g_string_free(lttng_live->url, TRUE);
956 }
957 if (lttng_live->no_stream_port) {
958 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
959 assert(!ret);
960 BT_PUT(lttng_live->no_stream_port);
961 }
962 if (lttng_live->no_stream_iter) {
963 g_free(lttng_live->no_stream_iter);
964 }
965 g_free(lttng_live);
966}
967
968BT_HIDDEN
969void lttng_live_component_finalize(struct bt_private_component *component)
970{
971 void *data = bt_private_component_get_user_data(component);
972
973 if (!data) {
974 return;
975 }
976 lttng_live_component_destroy_data(data);
977}
978
979static
980struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
981 struct bt_private_component *private_component)
982{
983 struct lttng_live_component *lttng_live;
984 struct bt_value *value = NULL;
985 const char *url;
986 enum bt_value_status ret;
987
988 lttng_live = g_new0(struct lttng_live_component, 1);
989 if (!lttng_live) {
990 goto end;
991 }
7cdc2bab
MD
992 /* TODO: make this an overridable parameter. */
993 lttng_live->max_query_size = MAX_QUERY_SIZE;
994 BT_INIT_LIST_HEAD(&lttng_live->sessions);
995 value = bt_value_map_get(params, "url");
996 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 997 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
998 goto error;
999 }
1000 ret = bt_value_string_get(value, &url);
1001 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1002 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1003 goto error;
1004 }
1005 lttng_live->url = g_string_new(url);
1006 if (!lttng_live->url) {
1007 goto error;
1008 }
1009 lttng_live->viewer_connection =
1010 bt_live_viewer_connection_create(lttng_live->url->str,
1011 stderr);
1012 if (!lttng_live->viewer_connection) {
1013 ret = BT_COMPONENT_STATUS_NOMEM;
1014 goto error;
1015 }
1016 if (lttng_live_create_viewer_session(lttng_live)) {
1017 ret = BT_COMPONENT_STATUS_ERROR;
1018 goto error;
1019 }
1020 lttng_live->private_component = private_component;
1021
1022 goto end;
1023
1024error:
1025 lttng_live_component_destroy_data(lttng_live);
1026 lttng_live = NULL;
1027end:
1028 return lttng_live;
d3e4dcd8
PP
1029}
1030
f3bc2010 1031BT_HIDDEN
7cdc2bab
MD
1032enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
1033 struct bt_value *params, void *init_method_data)
f3bc2010 1034{
7cdc2bab
MD
1035 struct lttng_live_component *lttng_live;
1036 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1037
7cdc2bab
MD
1038 /* Passes ownership of iter ref to lttng_live_component_create. */
1039 lttng_live = lttng_live_component_create(params, component);
1040 if (!lttng_live) {
1041 ret = BT_COMPONENT_STATUS_NOMEM;
1042 goto end;
1043 }
1044
1045 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1046 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1047 lttng_live->no_stream_iter->lttng_live = lttng_live;
1048
1049 lttng_live->no_stream_port =
1050 bt_private_component_source_add_output_private_port(
1051 lttng_live->private_component, "no-stream",
1052 lttng_live->no_stream_iter);
1053 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1054
1055 ret = bt_private_component_set_user_data(component, lttng_live);
1056 if (ret != BT_COMPONENT_STATUS_OK) {
1057 goto error;
1058 }
1059
1060end:
1061 return ret;
1062error:
1063 (void) bt_private_component_set_user_data(component, NULL);
1064 lttng_live_component_destroy_data(lttng_live);
1065 return ret;
f3bc2010 1066}
087bc060 1067
d85ef162
MD
1068BT_HIDDEN
1069enum bt_component_status lttng_live_accept_port_connection(
1070 struct bt_private_component *private_component,
1071 struct bt_private_port *self_private_port,
1072 struct bt_port *other_port)
1073{
1074 struct lttng_live_component *lttng_live =
1075 bt_private_component_get_user_data(private_component);
1076 struct bt_component *other_component;
1077 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1078 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1079
1080 other_component = bt_port_get_component(other_port);
1081 bt_put(other_component); /* weak */
1082
1083 if (!lttng_live->downstream_component) {
1084 lttng_live->downstream_component = other_component;
1085 goto end;
1086 }
1087
1088 /*
1089 * Compare prior component to ensure we are connected to the
1090 * same downstream component as prior ports.
1091 */
1092 if (lttng_live->downstream_component != other_component) {
1093 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1094 bt_port_get_name(self_port),
1095 bt_component_get_name(other_component),
1096 bt_component_get_name(lttng_live->downstream_component));
1097 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1098 goto end;
1099 }
1100end:
1101 bt_put(self_port);
1102 return status;
1103}
1104
087bc060
MD
1105static
1106void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
1107{
1108 enum bt_logging_level log_level = BT_LOG_NONE;
047358f9 1109 const char *log_level_env = getenv(BT_LOGLEVEL_NAME);
087bc060
MD
1110
1111 if (!log_level_env) {
1112 return;
1113 }
1114
1115 if (strcmp(log_level_env, "VERBOSE") == 0) {
1116 log_level = BT_LOGGING_LEVEL_VERBOSE;
1117 } else if (strcmp(log_level_env, "DEBUG") == 0) {
1118 log_level = BT_LOGGING_LEVEL_DEBUG;
1119 } else if (strcmp(log_level_env, "INFO") == 0) {
1120 log_level = BT_LOGGING_LEVEL_INFO;
1121 } else if (strcmp(log_level_env, "WARN") == 0) {
1122 log_level = BT_LOGGING_LEVEL_WARN;
1123 } else if (strcmp(log_level_env, "ERROR") == 0) {
1124 log_level = BT_LOGGING_LEVEL_ERROR;
1125 } else if (strcmp(log_level_env, "FATAL") == 0) {
1126 log_level = BT_LOGGING_LEVEL_FATAL;
d85ef162
MD
1127 } else {
1128 bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL;
047358f9
MD
1129 BT_LOGF("Incorrect log level specified in %s",
1130 BT_LOGLEVEL_NAME);
d85ef162 1131 abort();
087bc060
MD
1132 }
1133
1134 bt_lttng_live_log_level = log_level;
1135}
This page took 0.072464 seconds and 4 git commands to generate.