Fix: uninitialized return value member
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31 #include "logging.h"
32
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/graph/component-source.h>
35 #include <babeltrace/graph/private-port.h>
36 #include <babeltrace/graph/port.h>
37 #include <babeltrace/graph/private-component.h>
38 #include <babeltrace/graph/private-component-source.h>
39 #include <babeltrace/graph/private-notification-iterator.h>
40 #include <babeltrace/graph/notification-stream.h>
41 #include <babeltrace/graph/notification-packet.h>
42 #include <babeltrace/graph/notification-event.h>
43 #include <babeltrace/graph/notification-heap.h>
44 #include <babeltrace/graph/notification-iterator.h>
45 #include <babeltrace/graph/notification-inactivity.h>
46 #include <babeltrace/graph/graph.h>
47 #include <babeltrace/compiler-internal.h>
48 #include <babeltrace/types.h>
49 #include <inttypes.h>
50 #include <glib.h>
51 #include <assert.h>
52 #include <unistd.h>
53 #include <plugins-common.h>
54
55 #include "data-stream.h"
56 #include "metadata.h"
57 #include "lttng-live-internal.h"
58
59 #define MAX_QUERY_SIZE (256*1024)
60
61 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
62
63 static const char *print_state(struct lttng_live_stream_iterator *s)
64 {
65 switch (s->state) {
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT:
71 return "QUIESCENT";
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
73 return "ACTIVE_DATA";
74 case LTTNG_LIVE_STREAM_EOF:
75 return "EOF";
76 default:
77 return "ERROR";
78 }
79 }
80
81 static
82 void print_stream_state(struct lttng_live_stream_iterator *stream)
83 {
84 struct bt_port *port;
85
86 port = bt_port_from_private_port(stream->port);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
88 bt_port_get_name(port),
89 print_state(stream),
90 stream->last_returned_inactivity_timestamp,
91 stream->current_inactivity_timestamp);
92 bt_put(port);
93 }
94
95 BT_HIDDEN
96 bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
97 {
98 struct bt_component *component;
99 struct bt_graph *graph;
100 bt_bool ret;
101
102 if (!lttng_live) {
103 return BT_FALSE;
104 }
105
106 component = bt_component_from_private_component(lttng_live->private_component);
107 graph = bt_component_get_graph(component);
108 ret = bt_graph_is_canceled(graph);
109 bt_put(graph);
110 bt_put(component);
111 return ret;
112 }
113
114 BT_HIDDEN
115 int lttng_live_add_port(struct lttng_live_component *lttng_live,
116 struct lttng_live_stream_iterator *stream_iter)
117 {
118 int ret;
119 struct bt_private_port *private_port;
120 char name[STREAM_NAME_MAX_LEN];
121 enum bt_component_status status;
122
123 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
124 assert(ret > 0);
125 strcpy(stream_iter->name, name);
126 if (lttng_live_is_canceled(lttng_live)) {
127 return 0;
128 }
129 status = bt_private_component_source_add_output_private_port(
130 lttng_live->private_component, name, stream_iter,
131 &private_port);
132 switch (status) {
133 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
134 return 0;
135 case BT_COMPONENT_STATUS_OK:
136 break;
137 default:
138 return -1;
139 }
140 bt_put(private_port); /* weak */
141 BT_LOGI("Added port %s", name);
142
143 if (lttng_live->no_stream_port) {
144 bt_get(lttng_live->no_stream_port);
145 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
146 bt_put(lttng_live->no_stream_port);
147 if (ret) {
148 return -1;
149 }
150 lttng_live->no_stream_port = NULL;
151 lttng_live->no_stream_iter->port = NULL;
152 }
153 stream_iter->port = private_port;
154 return 0;
155 }
156
157 BT_HIDDEN
158 int lttng_live_remove_port(struct lttng_live_component *lttng_live,
159 struct bt_private_port *port)
160 {
161 struct bt_component *component;
162 int64_t nr_ports;
163 int ret;
164
165 component = bt_component_from_private_component(lttng_live->private_component);
166 nr_ports = bt_component_source_get_output_port_count(component);
167 if (nr_ports < 0) {
168 return -1;
169 }
170 BT_PUT(component);
171 if (nr_ports == 1) {
172 enum bt_component_status status;
173
174 assert(!lttng_live->no_stream_port);
175
176 if (lttng_live_is_canceled(lttng_live)) {
177 return 0;
178 }
179 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
180 "no-stream", lttng_live->no_stream_iter,
181 &lttng_live->no_stream_port);
182 switch (status) {
183 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
184 return 0;
185 case BT_COMPONENT_STATUS_OK:
186 break;
187 default:
188 return -1;
189 }
190 bt_put(lttng_live->no_stream_port); /* weak */
191 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
192 }
193 bt_get(port);
194 ret = bt_private_port_remove_from_component(port);
195 bt_put(port);
196 if (ret) {
197 return -1;
198 }
199 return 0;
200 }
201
202 static
203 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
204 uint64_t trace_id)
205 {
206 struct lttng_live_trace *trace;
207
208 bt_list_for_each_entry(trace, &session->traces, node) {
209 if (trace->id == trace_id) {
210 return trace;
211 }
212 }
213 return NULL;
214 }
215
216 static
217 void lttng_live_destroy_trace(struct bt_object *obj)
218 {
219 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
220
221 BT_LOGI("Destroy trace");
222 assert(bt_list_empty(&trace->streams));
223 bt_list_del(&trace->node);
224
225 if (trace->trace) {
226 int retval;
227
228 retval = bt_ctf_trace_set_is_static(trace->trace);
229 assert(!retval);
230 BT_PUT(trace->trace);
231 }
232 lttng_live_metadata_fini(trace);
233 BT_PUT(trace->cc_prio_map);
234 g_free(trace);
235 }
236
237 static
238 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
239 uint64_t trace_id)
240 {
241 struct lttng_live_trace *trace = NULL;
242
243 trace = g_new0(struct lttng_live_trace, 1);
244 if (!trace) {
245 goto error;
246 }
247 trace->session = session;
248 trace->id = trace_id;
249 BT_INIT_LIST_HEAD(&trace->streams);
250 trace->new_metadata_needed = true;
251 bt_list_add(&trace->node, &session->traces);
252 bt_object_init(&trace->obj, lttng_live_destroy_trace);
253 BT_LOGI("Create trace");
254 goto end;
255 error:
256 g_free(trace);
257 trace = NULL;
258 end:
259 return trace;
260 }
261
262 BT_HIDDEN
263 struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
264 uint64_t trace_id)
265 {
266 struct lttng_live_trace *trace;
267
268 trace = lttng_live_find_trace(session, trace_id);
269 if (trace) {
270 bt_get(trace);
271 return trace;
272 }
273 return lttng_live_create_trace(session, trace_id);
274 }
275
276 BT_HIDDEN
277 void lttng_live_unref_trace(struct lttng_live_trace *trace)
278 {
279 bt_put(trace);
280 }
281
282 static
283 void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
284 {
285 struct lttng_live_stream_iterator *stream, *s;
286
287 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
288 lttng_live_stream_iterator_destroy(stream);
289 }
290 lttng_live_metadata_fini(trace);
291 }
292
293 BT_HIDDEN
294 int lttng_live_add_session(struct lttng_live_component *lttng_live,
295 uint64_t session_id, const char *hostname,
296 const char *session_name)
297 {
298 int ret = 0;
299 struct lttng_live_session *s;
300
301 s = g_new0(struct lttng_live_session, 1);
302 if (!s) {
303 goto error;
304 }
305
306 s->id = session_id;
307 BT_INIT_LIST_HEAD(&s->traces);
308 s->lttng_live = lttng_live;
309 s->new_streams_needed = true;
310 s->hostname = g_string_new(hostname);
311 s->session_name = g_string_new(session_name);
312
313 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
314 s->id, hostname, session_name);
315 bt_list_add(&s->node, &lttng_live->sessions);
316 goto end;
317 error:
318 BT_LOGE("Error adding session");
319 g_free(s);
320 ret = -1;
321 end:
322 return ret;
323 }
324
325 static
326 void lttng_live_destroy_session(struct lttng_live_session *session)
327 {
328 struct lttng_live_trace *trace, *t;
329
330 BT_LOGI("Destroy session");
331 if (session->id != -1ULL) {
332 if (lttng_live_detach_session(session)) {
333 if (!lttng_live_is_canceled(session->lttng_live)) {
334 /* Old relayd cannot detach sessions. */
335 BT_LOGD("Unable to detach session %" PRIu64,
336 session->id);
337 }
338 }
339 session->id = -1ULL;
340 }
341 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
342 lttng_live_close_trace_streams(trace);
343 }
344 bt_list_del(&session->node);
345 if (session->hostname) {
346 g_string_free(session->hostname, TRUE);
347 }
348 if (session->session_name) {
349 g_string_free(session->session_name, TRUE);
350 }
351 g_free(session);
352 }
353
354 BT_HIDDEN
355 void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
356 {
357 struct lttng_live_stream_iterator_generic *s =
358 bt_private_notification_iterator_get_user_data(it);
359
360 switch (s->type) {
361 case LIVE_STREAM_TYPE_NO_STREAM:
362 {
363 /* Leave no_stream_iter in place when port is removed. */
364 break;
365 }
366 case LIVE_STREAM_TYPE_STREAM:
367 {
368 struct lttng_live_stream_iterator *stream_iter =
369 container_of(s, struct lttng_live_stream_iterator, p);
370
371 lttng_live_stream_iterator_destroy(stream_iter);
372 break;
373 }
374 }
375 }
376
377 static
378 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
379 struct lttng_live_component *lttng_live,
380 struct lttng_live_stream_iterator *lttng_live_stream)
381 {
382 switch (lttng_live_stream->state) {
383 case LTTNG_LIVE_STREAM_QUIESCENT:
384 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
385 break;
386 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
387 /* Invalid state. */
388 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
389 abort();
390 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
391 /* Invalid state. */
392 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
393 abort();
394 case LTTNG_LIVE_STREAM_EOF:
395 break;
396 }
397 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
398 }
399
400 /*
401 * For active no data stream, fetch next data. It can be either:
402 * - quiescent: need to put it in the prio heap at quiescent end
403 * timestamp,
404 * - have data: need to wire up first event into the prio heap,
405 * - have no data on this stream at this point: need to retry (AGAIN) or
406 * return EOF.
407 */
408 static
409 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
410 struct lttng_live_component *lttng_live,
411 struct lttng_live_stream_iterator *lttng_live_stream)
412 {
413 enum bt_ctf_lttng_live_iterator_status ret =
414 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
415 struct packet_index index;
416 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
417
418 if (lttng_live_stream->trace->new_metadata_needed) {
419 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 goto end;
421 }
422 if (lttng_live_stream->trace->session->new_streams_needed) {
423 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
424 goto end;
425 }
426 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
427 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
428 goto end;
429 }
430 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
431 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
432 goto end;
433 }
434 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
435 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
436 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
437 && lttng_live_stream->last_returned_inactivity_timestamp ==
438 lttng_live_stream->current_inactivity_timestamp) {
439 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 print_stream_state(lttng_live_stream);
441 } else {
442 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
443 }
444 goto end;
445 }
446 lttng_live_stream->base_offset = index.offset;
447 lttng_live_stream->offset = index.offset;
448 lttng_live_stream->len = index.packet_size / CHAR_BIT;
449 end:
450 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
451 ret = lttng_live_iterator_next_check_stream_state(
452 lttng_live, lttng_live_stream);
453 }
454 return ret;
455 }
456
457 /*
458 * Creation of the notification requires the ctf trace to be created
459 * beforehand, but the live protocol gives us all streams (including
460 * metadata) at once. So we split it in three steps: getting streams,
461 * getting metadata (which creates the ctf trace), and then creating the
462 * per-stream notifications.
463 */
464 static
465 enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
466 struct lttng_live_component *lttng_live,
467 struct lttng_live_session *session)
468 {
469 enum bt_ctf_lttng_live_iterator_status status;
470 struct lttng_live_trace *trace, *t;
471
472 if (lttng_live_attach_session(session)) {
473 if (lttng_live_is_canceled(lttng_live)) {
474 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
475 } else {
476 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
477 }
478 }
479 status = lttng_live_get_new_streams(session);
480 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
481 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
482 return status;
483 }
484 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
485 status = lttng_live_metadata_update(trace);
486 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
487 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
488 return status;
489 }
490 }
491 return lttng_live_lazy_notif_init(session);
492 }
493
494 BT_HIDDEN
495 void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
496 {
497 struct lttng_live_session *session;
498
499 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
500 session->new_streams_needed = true;
501 }
502 }
503
504 static
505 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
506 {
507 struct lttng_live_session *session;
508
509 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
510 struct lttng_live_trace *trace;
511
512 session->new_streams_needed = true;
513 bt_list_for_each_entry(trace, &session->traces, node) {
514 trace->new_metadata_needed = true;
515 }
516 }
517 }
518
519 static
520 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
521 struct lttng_live_component *lttng_live)
522 {
523 enum bt_ctf_lttng_live_iterator_status ret =
524 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
525 unsigned int nr_sessions_opened = 0;
526 struct lttng_live_session *session, *s;
527
528 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
529 if (session->closed && bt_list_empty(&session->traces)) {
530 lttng_live_destroy_session(session);
531 }
532 }
533 /*
534 * Currently, when there are no sessions, we quit immediately.
535 * We may want to add a component parameter to keep trying until
536 * we get data in the future.
537 * Also, in a remotely distant future, we could add a "new
538 * session" flag to the protocol, which would tell us that we
539 * need to query for new sessions even though we have sessions
540 * currently ongoing.
541 */
542 if (bt_list_empty(&lttng_live->sessions)) {
543 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
544 goto end;
545 }
546 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
547 ret = lttng_live_get_session(lttng_live, session);
548 switch (ret) {
549 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
550 break;
551 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
552 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
553 break;
554 default:
555 goto end;
556 }
557 if (!session->closed) {
558 nr_sessions_opened++;
559 }
560 }
561 end:
562 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
563 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
564 }
565 return ret;
566 }
567
568 static
569 enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
570 struct lttng_live_component *lttng_live,
571 struct lttng_live_stream_iterator *lttng_live_stream,
572 struct bt_notification **notification,
573 uint64_t timestamp)
574 {
575 enum bt_ctf_lttng_live_iterator_status ret =
576 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
577 struct lttng_live_trace *trace;
578 struct bt_ctf_clock_class *clock_class = NULL;
579 struct bt_ctf_clock_value *clock_value = NULL;
580 struct bt_notification *notif = NULL;
581 int retval;
582
583 trace = lttng_live_stream->trace;
584 if (!trace) {
585 goto error;
586 }
587 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
588 if (!clock_class) {
589 goto error;
590 }
591 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
592 if (!clock_value) {
593 goto error;
594 }
595 notif = bt_notification_inactivity_create(trace->cc_prio_map);
596 if (!notif) {
597 goto error;
598 }
599 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
600 if (retval) {
601 goto error;
602 }
603 *notification = notif;
604 end:
605 bt_put(clock_value);
606 bt_put(clock_class);
607 return ret;
608
609 error:
610 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
611 bt_put(notif);
612 goto end;
613 }
614
615 static
616 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
617 struct lttng_live_component *lttng_live,
618 struct lttng_live_stream_iterator *lttng_live_stream,
619 struct bt_notification **notification)
620 {
621 enum bt_ctf_lttng_live_iterator_status ret =
622 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
623 struct bt_ctf_clock_class *clock_class = NULL;
624 struct bt_ctf_clock_value *clock_value = NULL;
625
626 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
627 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
628 }
629
630 if (lttng_live_stream->current_inactivity_timestamp ==
631 lttng_live_stream->last_returned_inactivity_timestamp) {
632 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
634 goto end;
635 }
636
637 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
638 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
639
640 lttng_live_stream->last_returned_inactivity_timestamp =
641 lttng_live_stream->current_inactivity_timestamp;
642 end:
643 bt_put(clock_value);
644 bt_put(clock_class);
645 return ret;
646 }
647
648 static
649 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
650 struct lttng_live_component *lttng_live,
651 struct lttng_live_stream_iterator *lttng_live_stream,
652 struct bt_notification **notification)
653 {
654 enum bt_ctf_lttng_live_iterator_status ret =
655 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
656 enum bt_ctf_notif_iter_status status;
657 struct lttng_live_session *session;
658
659 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
660 struct lttng_live_trace *trace;
661
662 if (session->new_streams_needed) {
663 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
664 }
665 bt_list_for_each_entry(trace, &session->traces, node) {
666 if (trace->new_metadata_needed) {
667 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
668 }
669 }
670 }
671
672 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
673 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
674 }
675 if (lttng_live_stream->packet_end_notif_queue) {
676 *notification = lttng_live_stream->packet_end_notif_queue;
677 lttng_live_stream->packet_end_notif_queue = NULL;
678 status = BT_CTF_NOTIF_ITER_STATUS_OK;
679 } else {
680 status = bt_ctf_notif_iter_get_next_notification(
681 lttng_live_stream->notif_iter,
682 lttng_live_stream->trace->cc_prio_map,
683 notification);
684 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
685 /*
686 * Consider empty packets as inactivity.
687 */
688 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
689 lttng_live_stream->packet_end_notif_queue = *notification;
690 *notification = NULL;
691 return emit_inactivity_notification(lttng_live,
692 lttng_live_stream, notification,
693 lttng_live_stream->current_packet_end_timestamp);
694 }
695 }
696 }
697 switch (status) {
698 case BT_CTF_NOTIF_ITER_STATUS_EOF:
699 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
700 break;
701 case BT_CTF_NOTIF_ITER_STATUS_OK:
702 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
703 break;
704 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
705 /*
706 * Continue immediately (end of packet). The next
707 * get_index may return AGAIN to delay the following
708 * attempt.
709 */
710 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
711 break;
712 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
713 /* No argument provided by the user, so don't return INVAL. */
714 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
715 default:
716 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
717 break;
718 }
719 return ret;
720 }
721
722 /*
723 * helper function:
724 * handle_no_data_streams()
725 * retry:
726 * - for each ACTIVE_NO_DATA stream:
727 * - query relayd for stream data, or quiescence info.
728 * - if need metadata, get metadata, goto retry.
729 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
730 * - if quiescent, move to QUIESCENT streams
731 * - if fetched data, move to ACTIVE_DATA streams
732 * (at this point each stream either has data, or is quiescent)
733 *
734 *
735 * iterator_next:
736 * handle_new_streams_and_metadata()
737 * - query relayd for known streams, add them as ACTIVE_NO_DATA
738 * - query relayd for metadata
739 *
740 * call handle_active_no_data_streams()
741 *
742 * handle_quiescent_streams()
743 * - if at least one stream is ACTIVE_DATA:
744 * - peek stream event with lowest timestamp -> next_ts
745 * - for each quiescent stream
746 * - if next_ts >= quiescent end
747 * - set state to ACTIVE_NO_DATA
748 * - else
749 * - for each quiescent stream
750 * - set state to ACTIVE_NO_DATA
751 *
752 * call handle_active_no_data_streams()
753 *
754 * handle_active_data_streams()
755 * - if at least one stream is ACTIVE_DATA:
756 * - get stream event with lowest timestamp from heap
757 * - make that stream event the current notification.
758 * - move this stream heap position to its next event
759 * - if we need to fetch data from relayd, move
760 * stream to ACTIVE_NO_DATA.
761 * - return OK
762 * - return AGAIN
763 *
764 * end criterion: ctrl-c on client. If relayd exits or the session
765 * closes on the relay daemon side, we keep on waiting for streams.
766 * Eventually handle --end timestamp (also an end criterion).
767 *
768 * When disconnected from relayd: try to re-connect endlessly.
769 */
770 static
771 struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
772 struct bt_private_notification_iterator *iterator,
773 struct lttng_live_stream_iterator *stream_iter)
774 {
775 enum bt_ctf_lttng_live_iterator_status status;
776 struct bt_notification_iterator_next_return next_return;
777 struct lttng_live_component *lttng_live;
778
779 lttng_live = stream_iter->trace->session->lttng_live;
780 retry:
781 print_stream_state(stream_iter);
782 next_return.notification = NULL;
783 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
784 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
785 goto end;
786 }
787 status = lttng_live_iterator_next_handle_one_no_data_stream(
788 lttng_live, stream_iter);
789 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
790 goto end;
791 }
792 status = lttng_live_iterator_next_handle_one_quiescent_stream(
793 lttng_live, stream_iter, &next_return.notification);
794 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
795 assert(next_return.notification == NULL);
796 goto end;
797 }
798 if (next_return.notification) {
799 goto end;
800 }
801 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
802 stream_iter, &next_return.notification);
803 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
804 assert(next_return.notification == NULL);
805 }
806
807 end:
808 switch (status) {
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
810 print_dbg("continue");
811 goto retry;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
814 print_dbg("again");
815 break;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
817 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
818 print_dbg("end");
819 break;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
821 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
822 print_dbg("ok");
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
825 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
826 break;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
828 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
829 break;
830 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
831 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
832 break;
833 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
834 default: /* fall-through */
835 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
836 break;
837 }
838 return next_return;
839 }
840
841 static
842 struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
843 struct bt_private_notification_iterator *iterator,
844 struct lttng_live_no_stream_iterator *no_stream_iter)
845 {
846 enum bt_ctf_lttng_live_iterator_status status;
847 struct bt_notification_iterator_next_return next_return;
848 struct lttng_live_component *lttng_live;
849
850 lttng_live = no_stream_iter->lttng_live;
851 retry:
852 lttng_live_force_new_streams_and_metadata(lttng_live);
853 next_return.notification = NULL;
854 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
855 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
856 goto end;
857 }
858 if (no_stream_iter->port) {
859 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
860 } else {
861 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
862 }
863 end:
864 switch (status) {
865 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
866 goto retry;
867 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
868 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
869 break;
870 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
871 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
872 break;
873 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
874 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
875 break;
876 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
877 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
878 break;
879 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
880 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
881 break;
882 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
883 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
884 break;
885 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
886 default: /* fall-through */
887 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
888 break;
889 }
890 return next_return;
891 }
892
893 BT_HIDDEN
894 struct bt_notification_iterator_next_return lttng_live_iterator_next(
895 struct bt_private_notification_iterator *iterator)
896 {
897 struct lttng_live_stream_iterator_generic *s =
898 bt_private_notification_iterator_get_user_data(iterator);
899 struct bt_notification_iterator_next_return next_return;
900
901 switch (s->type) {
902 case LIVE_STREAM_TYPE_NO_STREAM:
903 next_return = lttng_live_iterator_next_no_stream(iterator,
904 container_of(s, struct lttng_live_no_stream_iterator, p));
905 break;
906 case LIVE_STREAM_TYPE_STREAM:
907 next_return = lttng_live_iterator_next_stream(iterator,
908 container_of(s, struct lttng_live_stream_iterator, p));
909 break;
910 default:
911 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
912 break;
913 }
914 return next_return;
915 }
916
917 BT_HIDDEN
918 enum bt_notification_iterator_status lttng_live_iterator_init(
919 struct bt_private_notification_iterator *it,
920 struct bt_private_port *port)
921 {
922 enum bt_notification_iterator_status ret =
923 BT_NOTIFICATION_ITERATOR_STATUS_OK;
924 struct lttng_live_stream_iterator_generic *s;
925
926 assert(it);
927
928 s = bt_private_port_get_user_data(port);
929 assert(s);
930 switch (s->type) {
931 case LIVE_STREAM_TYPE_NO_STREAM:
932 {
933 struct lttng_live_no_stream_iterator *no_stream_iter =
934 container_of(s, struct lttng_live_no_stream_iterator, p);
935 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
936 if (ret) {
937 goto error;
938 }
939 break;
940 }
941 case LIVE_STREAM_TYPE_STREAM:
942 {
943 struct lttng_live_stream_iterator *stream_iter =
944 container_of(s, struct lttng_live_stream_iterator, p);
945 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
946 if (ret) {
947 goto error;
948 }
949 break;
950 }
951 default:
952 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
953 goto end;
954 }
955
956 end:
957 return ret;
958 error:
959 if (bt_private_notification_iterator_set_user_data(it, NULL)
960 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
961 BT_LOGE("Error setting private data to NULL");
962 }
963 goto end;
964 }
965
966 static
967 struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
968 struct bt_value *params)
969 {
970 struct bt_value *url_value = NULL;
971 struct bt_value *results = NULL;
972 const char *url;
973 struct bt_live_viewer_connection *viewer_connection = NULL;
974
975 url_value = bt_value_map_get(params, "url");
976 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
977 BT_LOGW("Mandatory \"url\" parameter missing");
978 goto error;
979 }
980
981 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
982 BT_LOGW("\"url\" parameter is required to be a string value");
983 goto error;
984 }
985
986 viewer_connection = bt_live_viewer_connection_create(url, NULL);
987 if (!viewer_connection) {
988 goto error;
989 }
990
991 results = bt_live_viewer_connection_list_sessions(viewer_connection);
992 goto end;
993 error:
994 BT_PUT(results);
995 end:
996 if (viewer_connection) {
997 bt_live_viewer_connection_destroy(viewer_connection);
998 }
999 BT_PUT(url_value);
1000 return results;
1001 }
1002
1003 BT_HIDDEN
1004 struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
1005 const char *object, struct bt_value *params)
1006 {
1007 if (strcmp(object, "sessions") == 0) {
1008 return lttng_live_query_list_sessions(comp_class,
1009 params);
1010 }
1011 BT_LOGW("Unknown query object `%s`", object);
1012 return NULL;
1013 }
1014
1015 static
1016 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1017 {
1018 int ret;
1019 struct lttng_live_session *session, *s;
1020
1021 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1022 lttng_live_destroy_session(session);
1023 }
1024 BT_PUT(lttng_live->viewer_connection);
1025 if (lttng_live->url) {
1026 g_string_free(lttng_live->url, TRUE);
1027 }
1028 if (lttng_live->no_stream_port) {
1029 bt_get(lttng_live->no_stream_port);
1030 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
1031 bt_put(lttng_live->no_stream_port);
1032 assert(!ret);
1033 }
1034 if (lttng_live->no_stream_iter) {
1035 g_free(lttng_live->no_stream_iter);
1036 }
1037 g_free(lttng_live);
1038 }
1039
1040 BT_HIDDEN
1041 void lttng_live_component_finalize(struct bt_private_component *component)
1042 {
1043 void *data = bt_private_component_get_user_data(component);
1044
1045 if (!data) {
1046 return;
1047 }
1048 lttng_live_component_destroy_data(data);
1049 }
1050
1051 static
1052 struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
1053 struct bt_private_component *private_component)
1054 {
1055 struct lttng_live_component *lttng_live;
1056 struct bt_value *value = NULL;
1057 const char *url;
1058 enum bt_value_status ret;
1059
1060 lttng_live = g_new0(struct lttng_live_component, 1);
1061 if (!lttng_live) {
1062 goto end;
1063 }
1064 /* TODO: make this an overridable parameter. */
1065 lttng_live->max_query_size = MAX_QUERY_SIZE;
1066 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1067 value = bt_value_map_get(params, "url");
1068 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
1069 BT_LOGW("Mandatory \"url\" parameter missing");
1070 goto error;
1071 }
1072 ret = bt_value_string_get(value, &url);
1073 if (ret != BT_VALUE_STATUS_OK) {
1074 BT_LOGW("\"url\" parameter is required to be a string value");
1075 goto error;
1076 }
1077 lttng_live->url = g_string_new(url);
1078 if (!lttng_live->url) {
1079 goto error;
1080 }
1081 BT_PUT(value);
1082 lttng_live->viewer_connection =
1083 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
1084 if (!lttng_live->viewer_connection) {
1085 goto error;
1086 }
1087 if (lttng_live_create_viewer_session(lttng_live)) {
1088 goto error;
1089 }
1090 lttng_live->private_component = private_component;
1091
1092 goto end;
1093
1094 error:
1095 lttng_live_component_destroy_data(lttng_live);
1096 lttng_live = NULL;
1097 end:
1098 return lttng_live;
1099 }
1100
1101 BT_HIDDEN
1102 enum bt_component_status lttng_live_component_init(
1103 struct bt_private_component *private_component,
1104 struct bt_value *params, void *init_method_data)
1105 {
1106 struct lttng_live_component *lttng_live;
1107 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1108
1109 /* Passes ownership of iter ref to lttng_live_component_create. */
1110 lttng_live = lttng_live_component_create(params, private_component);
1111 if (!lttng_live) {
1112 //TODO : we need access to the application cancel state
1113 //because we are not part of a graph yet.
1114 ret = BT_COMPONENT_STATUS_NOMEM;
1115 goto end;
1116 }
1117
1118 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1119 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1120 lttng_live->no_stream_iter->lttng_live = lttng_live;
1121 if (lttng_live_is_canceled(lttng_live)) {
1122 goto end;
1123 }
1124 ret = bt_private_component_source_add_output_private_port(
1125 lttng_live->private_component, "no-stream",
1126 lttng_live->no_stream_iter,
1127 &lttng_live->no_stream_port);
1128 if (ret != BT_COMPONENT_STATUS_OK) {
1129 goto end;
1130 }
1131 bt_put(lttng_live->no_stream_port); /* weak */
1132 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1133
1134 ret = bt_private_component_set_user_data(private_component, lttng_live);
1135 if (ret != BT_COMPONENT_STATUS_OK) {
1136 goto error;
1137 }
1138
1139 end:
1140 return ret;
1141 error:
1142 (void) bt_private_component_set_user_data(private_component, NULL);
1143 lttng_live_component_destroy_data(lttng_live);
1144 return ret;
1145 }
1146
1147 BT_HIDDEN
1148 enum bt_component_status lttng_live_accept_port_connection(
1149 struct bt_private_component *private_component,
1150 struct bt_private_port *self_private_port,
1151 struct bt_port *other_port)
1152 {
1153 struct lttng_live_component *lttng_live =
1154 bt_private_component_get_user_data(private_component);
1155 struct bt_component *other_component;
1156 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1157 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1158
1159 other_component = bt_port_get_component(other_port);
1160 bt_put(other_component); /* weak */
1161
1162 if (!lttng_live->downstream_component) {
1163 lttng_live->downstream_component = other_component;
1164 goto end;
1165 }
1166
1167 /*
1168 * Compare prior component to ensure we are connected to the
1169 * same downstream component as prior ports.
1170 */
1171 if (lttng_live->downstream_component != other_component) {
1172 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1173 bt_port_get_name(self_port),
1174 bt_component_get_name(other_component),
1175 bt_component_get_name(lttng_live->downstream_component));
1176 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1177 goto end;
1178 }
1179 end:
1180 bt_put(self_port);
1181 return status;
1182 }
This page took 0.088225 seconds and 4 git commands to generate.