a1e7d45e7bad2c9f6e80ccffd19f7eaf594806d1
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
1 /*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31 #include "logging.h"
32
33 #include <babeltrace/ctf-ir/packet.h>
34 #include <babeltrace/graph/component-source.h>
35 #include <babeltrace/graph/private-port.h>
36 #include <babeltrace/graph/port.h>
37 #include <babeltrace/graph/private-component.h>
38 #include <babeltrace/graph/private-component-source.h>
39 #include <babeltrace/graph/private-notification-iterator.h>
40 #include <babeltrace/graph/notification-stream.h>
41 #include <babeltrace/graph/notification-packet.h>
42 #include <babeltrace/graph/notification-event.h>
43 #include <babeltrace/graph/notification-heap.h>
44 #include <babeltrace/graph/notification-iterator.h>
45 #include <babeltrace/graph/notification-inactivity.h>
46 #include <babeltrace/graph/graph.h>
47 #include <babeltrace/compiler-internal.h>
48 #include <babeltrace/types.h>
49 #include <inttypes.h>
50 #include <glib.h>
51 #include <assert.h>
52 #include <unistd.h>
53 #include <plugins-common.h>
54
55 #include "data-stream.h"
56 #include "metadata.h"
57 #include "lttng-live-internal.h"
58
59 #define MAX_QUERY_SIZE (256*1024)
60
61 #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
62
63 static const char *print_state(struct lttng_live_stream_iterator *s)
64 {
65 switch (s->state) {
66 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
67 return "ACTIVE_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
69 return "QUIESCENT_NO_DATA";
70 case LTTNG_LIVE_STREAM_QUIESCENT:
71 return "QUIESCENT";
72 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
73 return "ACTIVE_DATA";
74 case LTTNG_LIVE_STREAM_EOF:
75 return "EOF";
76 default:
77 return "ERROR";
78 }
79 }
80
81 static
82 void print_stream_state(struct lttng_live_stream_iterator *stream)
83 {
84 struct bt_port *port;
85
86 port = bt_port_from_private_port(stream->port);
87 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
88 bt_port_get_name(port),
89 print_state(stream),
90 stream->last_returned_inactivity_timestamp,
91 stream->current_inactivity_timestamp);
92 bt_put(port);
93 }
94
95 BT_HIDDEN
96 bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
97 {
98 struct bt_component *component;
99 struct bt_graph *graph;
100 bt_bool ret;
101
102 if (!lttng_live) {
103 return BT_FALSE;
104 }
105
106 component = bt_component_from_private_component(lttng_live->private_component);
107 graph = bt_component_get_graph(component);
108 ret = bt_graph_is_canceled(graph);
109 bt_put(graph);
110 bt_put(component);
111 return ret;
112 }
113
114 BT_HIDDEN
115 int lttng_live_add_port(struct lttng_live_component *lttng_live,
116 struct lttng_live_stream_iterator *stream_iter)
117 {
118 int ret;
119 struct bt_private_port *private_port;
120 char name[STREAM_NAME_MAX_LEN];
121 enum bt_component_status status;
122
123 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
124 assert(ret > 0);
125 strcpy(stream_iter->name, name);
126 if (lttng_live_is_canceled(lttng_live)) {
127 return 0;
128 }
129 status = bt_private_component_source_add_output_private_port(
130 lttng_live->private_component, name, stream_iter,
131 &private_port);
132 switch (status) {
133 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
134 return 0;
135 case BT_COMPONENT_STATUS_OK:
136 break;
137 default:
138 return -1;
139 }
140 bt_put(private_port); /* weak */
141 BT_LOGI("Added port %s", name);
142
143 if (lttng_live->no_stream_port) {
144 bt_get(lttng_live->no_stream_port);
145 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
146 bt_put(lttng_live->no_stream_port);
147 if (ret) {
148 return -1;
149 }
150 lttng_live->no_stream_port = NULL;
151 lttng_live->no_stream_iter->port = NULL;
152 }
153 stream_iter->port = private_port;
154 return 0;
155 }
156
157 BT_HIDDEN
158 int lttng_live_remove_port(struct lttng_live_component *lttng_live,
159 struct bt_private_port *port)
160 {
161 struct bt_component *component;
162 int64_t nr_ports;
163 int ret;
164
165 component = bt_component_from_private_component(lttng_live->private_component);
166 nr_ports = bt_component_source_get_output_port_count(component);
167 if (nr_ports < 0) {
168 return -1;
169 }
170 BT_PUT(component);
171 if (nr_ports == 1) {
172 enum bt_component_status status;
173
174 assert(!lttng_live->no_stream_port);
175
176 if (lttng_live_is_canceled(lttng_live)) {
177 return 0;
178 }
179 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
180 "no-stream", lttng_live->no_stream_iter,
181 &lttng_live->no_stream_port);
182 switch (status) {
183 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
184 return 0;
185 case BT_COMPONENT_STATUS_OK:
186 break;
187 default:
188 return -1;
189 }
190 bt_put(lttng_live->no_stream_port); /* weak */
191 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
192 }
193 bt_get(port);
194 ret = bt_private_port_remove_from_component(port);
195 bt_put(port);
196 if (ret) {
197 return -1;
198 }
199 return 0;
200 }
201
202 static
203 struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
204 uint64_t trace_id)
205 {
206 struct lttng_live_trace *trace;
207
208 bt_list_for_each_entry(trace, &session->traces, node) {
209 if (trace->id == trace_id) {
210 return trace;
211 }
212 }
213 return NULL;
214 }
215
216 static
217 void lttng_live_destroy_trace(struct bt_object *obj)
218 {
219 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
220
221 BT_LOGI("Destroy trace");
222 assert(bt_list_empty(&trace->streams));
223 bt_list_del(&trace->node);
224
225 if (trace->trace) {
226 int retval;
227
228 retval = bt_ctf_trace_set_is_static(trace->trace);
229 assert(!retval);
230 BT_PUT(trace->trace);
231 }
232 lttng_live_metadata_fini(trace);
233 BT_PUT(trace->cc_prio_map);
234 g_free(trace);
235 }
236
237 static
238 struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
239 uint64_t trace_id)
240 {
241 struct lttng_live_trace *trace = NULL;
242
243 trace = g_new0(struct lttng_live_trace, 1);
244 if (!trace) {
245 goto error;
246 }
247 trace->session = session;
248 trace->id = trace_id;
249 BT_INIT_LIST_HEAD(&trace->streams);
250 trace->new_metadata_needed = true;
251 bt_list_add(&trace->node, &session->traces);
252 bt_object_init(&trace->obj, lttng_live_destroy_trace);
253 BT_LOGI("Create trace");
254 goto end;
255 error:
256 g_free(trace);
257 trace = NULL;
258 end:
259 return trace;
260 }
261
262 BT_HIDDEN
263 struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
264 uint64_t trace_id)
265 {
266 struct lttng_live_trace *trace;
267
268 trace = lttng_live_find_trace(session, trace_id);
269 if (trace) {
270 bt_get(trace);
271 return trace;
272 }
273 return lttng_live_create_trace(session, trace_id);
274 }
275
276 BT_HIDDEN
277 void lttng_live_unref_trace(struct lttng_live_trace *trace)
278 {
279 bt_put(trace);
280 }
281
282 static
283 void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
284 {
285 struct lttng_live_stream_iterator *stream, *s;
286
287 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
288 lttng_live_stream_iterator_destroy(stream);
289 }
290 lttng_live_metadata_fini(trace);
291 }
292
293 BT_HIDDEN
294 int lttng_live_add_session(struct lttng_live_component *lttng_live,
295 uint64_t session_id, const char *hostname,
296 const char *session_name)
297 {
298 int ret = 0;
299 struct lttng_live_session *s;
300
301 s = g_new0(struct lttng_live_session, 1);
302 if (!s) {
303 goto error;
304 }
305
306 s->id = session_id;
307 BT_INIT_LIST_HEAD(&s->traces);
308 s->lttng_live = lttng_live;
309 s->new_streams_needed = true;
310 s->hostname = g_string_new(hostname);
311 s->session_name = g_string_new(session_name);
312
313 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
314 s->id, hostname, session_name);
315 bt_list_add(&s->node, &lttng_live->sessions);
316 goto end;
317 error:
318 BT_LOGE("Error adding session");
319 g_free(s);
320 ret = -1;
321 end:
322 return ret;
323 }
324
325 static
326 void lttng_live_destroy_session(struct lttng_live_session *session)
327 {
328 struct lttng_live_trace *trace, *t;
329
330 BT_LOGI("Destroy session");
331 if (session->id != -1ULL) {
332 if (lttng_live_detach_session(session)) {
333 if (!lttng_live_is_canceled(session->lttng_live)) {
334 /* Old relayd cannot detach sessions. */
335 BT_LOGD("Unable to detach session %" PRIu64,
336 session->id);
337 }
338 }
339 session->id = -1ULL;
340 }
341 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
342 lttng_live_close_trace_streams(trace);
343 }
344 bt_list_del(&session->node);
345 if (session->hostname) {
346 g_string_free(session->hostname, TRUE);
347 }
348 if (session->session_name) {
349 g_string_free(session->session_name, TRUE);
350 }
351 g_free(session);
352 }
353
354 BT_HIDDEN
355 void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
356 {
357 struct lttng_live_stream_iterator_generic *s =
358 bt_private_notification_iterator_get_user_data(it);
359
360 switch (s->type) {
361 case LIVE_STREAM_TYPE_NO_STREAM:
362 {
363 /* Leave no_stream_iter in place when port is removed. */
364 break;
365 }
366 case LIVE_STREAM_TYPE_STREAM:
367 {
368 struct lttng_live_stream_iterator *stream_iter =
369 container_of(s, struct lttng_live_stream_iterator, p);
370
371 lttng_live_stream_iterator_destroy(stream_iter);
372 break;
373 }
374 }
375 }
376
377 static
378 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
379 struct lttng_live_component *lttng_live,
380 struct lttng_live_stream_iterator *lttng_live_stream)
381 {
382 switch (lttng_live_stream->state) {
383 case LTTNG_LIVE_STREAM_QUIESCENT:
384 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
385 break;
386 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
387 /* Invalid state. */
388 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
389 abort();
390 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
391 /* Invalid state. */
392 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
393 abort();
394 case LTTNG_LIVE_STREAM_EOF:
395 break;
396 }
397 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
398 }
399
400 /*
401 * For active no data stream, fetch next data. It can be either:
402 * - quiescent: need to put it in the prio heap at quiescent end
403 * timestamp,
404 * - have data: need to wire up first event into the prio heap,
405 * - have no data on this stream at this point: need to retry (AGAIN) or
406 * return EOF.
407 */
408 static
409 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
410 struct lttng_live_component *lttng_live,
411 struct lttng_live_stream_iterator *lttng_live_stream)
412 {
413 enum bt_ctf_lttng_live_iterator_status ret =
414 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
415 struct packet_index index;
416 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
417
418 if (lttng_live_stream->trace->new_metadata_needed) {
419 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
420 goto end;
421 }
422 if (lttng_live_stream->trace->session->new_streams_needed) {
423 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
424 goto end;
425 }
426 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
427 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
428 goto end;
429 }
430 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
431 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
432 goto end;
433 }
434 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
435 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
436 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
437 && lttng_live_stream->last_returned_inactivity_timestamp ==
438 lttng_live_stream->current_inactivity_timestamp) {
439 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
440 print_stream_state(lttng_live_stream);
441 } else {
442 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
443 }
444 goto end;
445 }
446 lttng_live_stream->base_offset = index.offset;
447 lttng_live_stream->offset = index.offset;
448 lttng_live_stream->len = index.packet_size / CHAR_BIT;
449 end:
450 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
451 ret = lttng_live_iterator_next_check_stream_state(
452 lttng_live, lttng_live_stream);
453 }
454 return ret;
455 }
456
457 /*
458 * Creation of the notification requires the ctf trace to be created
459 * beforehand, but the live protocol gives us all streams (including
460 * metadata) at once. So we split it in three steps: getting streams,
461 * getting metadata (which creates the ctf trace), and then creating the
462 * per-stream notifications.
463 */
464 static
465 enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
466 struct lttng_live_component *lttng_live,
467 struct lttng_live_session *session)
468 {
469 enum bt_ctf_lttng_live_iterator_status status;
470 struct lttng_live_trace *trace, *t;
471
472 if (lttng_live_attach_session(session)) {
473 if (lttng_live_is_canceled(lttng_live)) {
474 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
475 } else {
476 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
477 }
478 }
479 status = lttng_live_get_new_streams(session);
480 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
481 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
482 return status;
483 }
484 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
485 status = lttng_live_metadata_update(trace);
486 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
487 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
488 return status;
489 }
490 }
491 return lttng_live_lazy_notif_init(session);
492 }
493
494 BT_HIDDEN
495 void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
496 {
497 struct lttng_live_session *session;
498
499 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
500 session->new_streams_needed = true;
501 }
502 }
503
504 static
505 void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
506 {
507 struct lttng_live_session *session;
508
509 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
510 struct lttng_live_trace *trace;
511
512 session->new_streams_needed = true;
513 bt_list_for_each_entry(trace, &session->traces, node) {
514 trace->new_metadata_needed = true;
515 }
516 }
517 }
518
519 static
520 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
521 struct lttng_live_component *lttng_live)
522 {
523 enum bt_ctf_lttng_live_iterator_status ret =
524 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
525 unsigned int nr_sessions_opened = 0;
526 struct lttng_live_session *session, *s;
527
528 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
529 if (session->closed && bt_list_empty(&session->traces)) {
530 lttng_live_destroy_session(session);
531 }
532 }
533 /*
534 * Currently, when there are no sessions, we quit immediately.
535 * We may want to add a component parameter to keep trying until
536 * we get data in the future.
537 * Also, in a remotely distant future, we could add a "new
538 * session" flag to the protocol, which would tell us that we
539 * need to query for new sessions even though we have sessions
540 * currently ongoing.
541 */
542 if (bt_list_empty(&lttng_live->sessions)) {
543 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
544 goto end;
545 }
546 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
547 ret = lttng_live_get_session(lttng_live, session);
548 switch (ret) {
549 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
550 break;
551 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
552 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
553 break;
554 default:
555 goto end;
556 }
557 if (!session->closed) {
558 nr_sessions_opened++;
559 }
560 }
561 end:
562 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
563 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
564 }
565 return ret;
566 }
567
568 static
569 enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
570 struct lttng_live_component *lttng_live,
571 struct lttng_live_stream_iterator *lttng_live_stream,
572 struct bt_notification **notification,
573 uint64_t timestamp)
574 {
575 enum bt_ctf_lttng_live_iterator_status ret =
576 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
577 struct lttng_live_trace *trace;
578 struct bt_ctf_clock_class *clock_class = NULL;
579 struct bt_ctf_clock_value *clock_value = NULL;
580 struct bt_notification *notif = NULL;
581 int retval;
582
583 trace = lttng_live_stream->trace;
584 if (!trace) {
585 goto error;
586 }
587 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
588 if (!clock_class) {
589 goto error;
590 }
591 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
592 if (!clock_value) {
593 goto error;
594 }
595 notif = bt_notification_inactivity_create(trace->cc_prio_map);
596 if (!notif) {
597 goto error;
598 }
599 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
600 if (retval) {
601 goto error;
602 }
603 *notification = notif;
604 end:
605 bt_put(clock_value);
606 bt_put(clock_class);
607 return ret;
608
609 error:
610 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
611 bt_put(notif);
612 goto end;
613 }
614
615 static
616 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
617 struct lttng_live_component *lttng_live,
618 struct lttng_live_stream_iterator *lttng_live_stream,
619 struct bt_notification **notification)
620 {
621 enum bt_ctf_lttng_live_iterator_status ret =
622 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
623 struct bt_ctf_clock_class *clock_class = NULL;
624 struct bt_ctf_clock_value *clock_value = NULL;
625
626 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
627 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
628 }
629
630 if (lttng_live_stream->current_inactivity_timestamp ==
631 lttng_live_stream->last_returned_inactivity_timestamp) {
632 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
633 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
634 goto end;
635 }
636
637 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
638 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
639
640 lttng_live_stream->last_returned_inactivity_timestamp =
641 lttng_live_stream->current_inactivity_timestamp;
642 end:
643 bt_put(clock_value);
644 bt_put(clock_class);
645 return ret;
646 }
647
648 static
649 enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
650 struct lttng_live_component *lttng_live,
651 struct lttng_live_stream_iterator *lttng_live_stream,
652 struct bt_notification **notification)
653 {
654 enum bt_ctf_lttng_live_iterator_status ret =
655 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
656 enum bt_ctf_notif_iter_status status;
657 struct lttng_live_session *session;
658
659 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
660 struct lttng_live_trace *trace;
661
662 if (session->new_streams_needed) {
663 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
664 }
665 bt_list_for_each_entry(trace, &session->traces, node) {
666 if (trace->new_metadata_needed) {
667 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
668 }
669 }
670 }
671
672 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
673 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
674 }
675 if (lttng_live_stream->packet_end_notif_queue) {
676 *notification = lttng_live_stream->packet_end_notif_queue;
677 lttng_live_stream->packet_end_notif_queue = NULL;
678 status = BT_CTF_NOTIF_ITER_STATUS_OK;
679 } else {
680 status = bt_ctf_notif_iter_get_next_notification(
681 lttng_live_stream->notif_iter,
682 lttng_live_stream->trace->cc_prio_map,
683 notification);
684 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
685 /*
686 * Consider empty packets as inactivity.
687 */
688 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
689 lttng_live_stream->packet_end_notif_queue = *notification;
690 *notification = NULL;
691 return emit_inactivity_notification(lttng_live,
692 lttng_live_stream, notification,
693 lttng_live_stream->current_packet_end_timestamp);
694 }
695 }
696 }
697 switch (status) {
698 case BT_CTF_NOTIF_ITER_STATUS_EOF:
699 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
700 break;
701 case BT_CTF_NOTIF_ITER_STATUS_OK:
702 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
703 break;
704 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
705 /*
706 * Continue immediately (end of packet). The next
707 * get_index may return AGAIN to delay the following
708 * attempt.
709 */
710 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
711 break;
712 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
713 /* No argument provided by the user, so don't return INVAL. */
714 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
715 default:
716 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
717 break;
718 }
719 return ret;
720 }
721
722 /*
723 * helper function:
724 * handle_no_data_streams()
725 * retry:
726 * - for each ACTIVE_NO_DATA stream:
727 * - query relayd for stream data, or quiescence info.
728 * - if need metadata, get metadata, goto retry.
729 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
730 * - if quiescent, move to QUIESCENT streams
731 * - if fetched data, move to ACTIVE_DATA streams
732 * (at this point each stream either has data, or is quiescent)
733 *
734 *
735 * iterator_next:
736 * handle_new_streams_and_metadata()
737 * - query relayd for known streams, add them as ACTIVE_NO_DATA
738 * - query relayd for metadata
739 *
740 * call handle_active_no_data_streams()
741 *
742 * handle_quiescent_streams()
743 * - if at least one stream is ACTIVE_DATA:
744 * - peek stream event with lowest timestamp -> next_ts
745 * - for each quiescent stream
746 * - if next_ts >= quiescent end
747 * - set state to ACTIVE_NO_DATA
748 * - else
749 * - for each quiescent stream
750 * - set state to ACTIVE_NO_DATA
751 *
752 * call handle_active_no_data_streams()
753 *
754 * handle_active_data_streams()
755 * - if at least one stream is ACTIVE_DATA:
756 * - get stream event with lowest timestamp from heap
757 * - make that stream event the current notification.
758 * - move this stream heap position to its next event
759 * - if we need to fetch data from relayd, move
760 * stream to ACTIVE_NO_DATA.
761 * - return OK
762 * - return AGAIN
763 *
764 * end criterion: ctrl-c on client. If relayd exits or the session
765 * closes on the relay daemon side, we keep on waiting for streams.
766 * Eventually handle --end timestamp (also an end criterion).
767 *
768 * When disconnected from relayd: try to re-connect endlessly.
769 */
770 static
771 struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
772 struct bt_private_notification_iterator *iterator,
773 struct lttng_live_stream_iterator *stream_iter)
774 {
775 enum bt_ctf_lttng_live_iterator_status status;
776 struct bt_notification_iterator_next_return next_return;
777 struct lttng_live_component *lttng_live;
778
779 lttng_live = stream_iter->trace->session->lttng_live;
780 retry:
781 print_stream_state(stream_iter);
782 next_return.notification = NULL;
783 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
784 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
785 goto end;
786 }
787 status = lttng_live_iterator_next_handle_one_no_data_stream(
788 lttng_live, stream_iter);
789 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
790 goto end;
791 }
792 status = lttng_live_iterator_next_handle_one_quiescent_stream(
793 lttng_live, stream_iter, &next_return.notification);
794 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
795 assert(next_return.notification == NULL);
796 goto end;
797 }
798 if (next_return.notification) {
799 goto end;
800 }
801 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
802 stream_iter, &next_return.notification);
803 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
804 assert(next_return.notification == NULL);
805 }
806
807 end:
808 switch (status) {
809 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
810 print_dbg("continue");
811 goto retry;
812 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
813 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
814 print_dbg("again");
815 break;
816 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
817 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
818 print_dbg("end");
819 break;
820 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
821 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
822 print_dbg("ok");
823 break;
824 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
825 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
826 break;
827 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
828 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
829 break;
830 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
831 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
832 break;
833 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
834 default: /* fall-through */
835 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
836 break;
837 }
838 return next_return;
839 }
840
841 static
842 struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
843 struct bt_private_notification_iterator *iterator,
844 struct lttng_live_no_stream_iterator *no_stream_iter)
845 {
846 enum bt_ctf_lttng_live_iterator_status status;
847 struct bt_notification_iterator_next_return next_return;
848 struct lttng_live_component *lttng_live;
849
850 lttng_live = no_stream_iter->lttng_live;
851 retry:
852 lttng_live_force_new_streams_and_metadata(lttng_live);
853 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
854 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
855 goto end;
856 }
857 if (no_stream_iter->port) {
858 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
859 } else {
860 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
861 }
862 end:
863 switch (status) {
864 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
865 goto retry;
866 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
867 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
868 break;
869 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
870 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
871 break;
872 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
873 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
874 break;
875 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
876 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
877 break;
878 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
879 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
880 break;
881 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
882 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
883 break;
884 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
885 default: /* fall-through */
886 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
887 break;
888 }
889 return next_return;
890 }
891
892 BT_HIDDEN
893 struct bt_notification_iterator_next_return lttng_live_iterator_next(
894 struct bt_private_notification_iterator *iterator)
895 {
896 struct lttng_live_stream_iterator_generic *s =
897 bt_private_notification_iterator_get_user_data(iterator);
898 struct bt_notification_iterator_next_return next_return;
899
900 switch (s->type) {
901 case LIVE_STREAM_TYPE_NO_STREAM:
902 next_return = lttng_live_iterator_next_no_stream(iterator,
903 container_of(s, struct lttng_live_no_stream_iterator, p));
904 break;
905 case LIVE_STREAM_TYPE_STREAM:
906 next_return = lttng_live_iterator_next_stream(iterator,
907 container_of(s, struct lttng_live_stream_iterator, p));
908 break;
909 default:
910 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
911 break;
912 }
913 return next_return;
914 }
915
916 BT_HIDDEN
917 enum bt_notification_iterator_status lttng_live_iterator_init(
918 struct bt_private_notification_iterator *it,
919 struct bt_private_port *port)
920 {
921 enum bt_notification_iterator_status ret =
922 BT_NOTIFICATION_ITERATOR_STATUS_OK;
923 struct lttng_live_stream_iterator_generic *s;
924
925 assert(it);
926
927 s = bt_private_port_get_user_data(port);
928 assert(s);
929 switch (s->type) {
930 case LIVE_STREAM_TYPE_NO_STREAM:
931 {
932 struct lttng_live_no_stream_iterator *no_stream_iter =
933 container_of(s, struct lttng_live_no_stream_iterator, p);
934 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
935 if (ret) {
936 goto error;
937 }
938 break;
939 }
940 case LIVE_STREAM_TYPE_STREAM:
941 {
942 struct lttng_live_stream_iterator *stream_iter =
943 container_of(s, struct lttng_live_stream_iterator, p);
944 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
945 if (ret) {
946 goto error;
947 }
948 break;
949 }
950 default:
951 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
952 goto end;
953 }
954
955 end:
956 return ret;
957 error:
958 if (bt_private_notification_iterator_set_user_data(it, NULL)
959 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
960 BT_LOGE("Error setting private data to NULL");
961 }
962 goto end;
963 }
964
965 static
966 struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
967 struct bt_value *params)
968 {
969 struct bt_value *url_value = NULL;
970 struct bt_value *results = NULL;
971 const char *url;
972 struct bt_live_viewer_connection *viewer_connection = NULL;
973
974 url_value = bt_value_map_get(params, "url");
975 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
976 BT_LOGW("Mandatory \"url\" parameter missing");
977 goto error;
978 }
979
980 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
981 BT_LOGW("\"url\" parameter is required to be a string value");
982 goto error;
983 }
984
985 viewer_connection = bt_live_viewer_connection_create(url, NULL);
986 if (!viewer_connection) {
987 goto error;
988 }
989
990 results = bt_live_viewer_connection_list_sessions(viewer_connection);
991 goto end;
992 error:
993 BT_PUT(results);
994 end:
995 if (viewer_connection) {
996 bt_live_viewer_connection_destroy(viewer_connection);
997 }
998 BT_PUT(url_value);
999 return results;
1000 }
1001
1002 BT_HIDDEN
1003 struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
1004 const char *object, struct bt_value *params)
1005 {
1006 if (strcmp(object, "sessions") == 0) {
1007 return lttng_live_query_list_sessions(comp_class,
1008 params);
1009 }
1010 BT_LOGW("Unknown query object `%s`", object);
1011 return NULL;
1012 }
1013
1014 static
1015 void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1016 {
1017 int ret;
1018 struct lttng_live_session *session, *s;
1019
1020 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1021 lttng_live_destroy_session(session);
1022 }
1023 BT_PUT(lttng_live->viewer_connection);
1024 if (lttng_live->url) {
1025 g_string_free(lttng_live->url, TRUE);
1026 }
1027 if (lttng_live->no_stream_port) {
1028 bt_get(lttng_live->no_stream_port);
1029 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
1030 bt_put(lttng_live->no_stream_port);
1031 assert(!ret);
1032 }
1033 if (lttng_live->no_stream_iter) {
1034 g_free(lttng_live->no_stream_iter);
1035 }
1036 g_free(lttng_live);
1037 }
1038
1039 BT_HIDDEN
1040 void lttng_live_component_finalize(struct bt_private_component *component)
1041 {
1042 void *data = bt_private_component_get_user_data(component);
1043
1044 if (!data) {
1045 return;
1046 }
1047 lttng_live_component_destroy_data(data);
1048 }
1049
1050 static
1051 struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
1052 struct bt_private_component *private_component)
1053 {
1054 struct lttng_live_component *lttng_live;
1055 struct bt_value *value = NULL;
1056 const char *url;
1057 enum bt_value_status ret;
1058
1059 lttng_live = g_new0(struct lttng_live_component, 1);
1060 if (!lttng_live) {
1061 goto end;
1062 }
1063 /* TODO: make this an overridable parameter. */
1064 lttng_live->max_query_size = MAX_QUERY_SIZE;
1065 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1066 value = bt_value_map_get(params, "url");
1067 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
1068 BT_LOGW("Mandatory \"url\" parameter missing");
1069 goto error;
1070 }
1071 ret = bt_value_string_get(value, &url);
1072 if (ret != BT_VALUE_STATUS_OK) {
1073 BT_LOGW("\"url\" parameter is required to be a string value");
1074 goto error;
1075 }
1076 lttng_live->url = g_string_new(url);
1077 if (!lttng_live->url) {
1078 goto error;
1079 }
1080 BT_PUT(value);
1081 lttng_live->viewer_connection =
1082 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
1083 if (!lttng_live->viewer_connection) {
1084 goto error;
1085 }
1086 if (lttng_live_create_viewer_session(lttng_live)) {
1087 goto error;
1088 }
1089 lttng_live->private_component = private_component;
1090
1091 goto end;
1092
1093 error:
1094 lttng_live_component_destroy_data(lttng_live);
1095 lttng_live = NULL;
1096 end:
1097 return lttng_live;
1098 }
1099
1100 BT_HIDDEN
1101 enum bt_component_status lttng_live_component_init(
1102 struct bt_private_component *private_component,
1103 struct bt_value *params, void *init_method_data)
1104 {
1105 struct lttng_live_component *lttng_live;
1106 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1107
1108 /* Passes ownership of iter ref to lttng_live_component_create. */
1109 lttng_live = lttng_live_component_create(params, private_component);
1110 if (!lttng_live) {
1111 //TODO : we need access to the application cancel state
1112 //because we are not part of a graph yet.
1113 ret = BT_COMPONENT_STATUS_NOMEM;
1114 goto end;
1115 }
1116
1117 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1118 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1119 lttng_live->no_stream_iter->lttng_live = lttng_live;
1120 if (lttng_live_is_canceled(lttng_live)) {
1121 goto end;
1122 }
1123 ret = bt_private_component_source_add_output_private_port(
1124 lttng_live->private_component, "no-stream",
1125 lttng_live->no_stream_iter,
1126 &lttng_live->no_stream_port);
1127 if (ret != BT_COMPONENT_STATUS_OK) {
1128 goto end;
1129 }
1130 bt_put(lttng_live->no_stream_port); /* weak */
1131 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1132
1133 ret = bt_private_component_set_user_data(private_component, lttng_live);
1134 if (ret != BT_COMPONENT_STATUS_OK) {
1135 goto error;
1136 }
1137
1138 end:
1139 return ret;
1140 error:
1141 (void) bt_private_component_set_user_data(private_component, NULL);
1142 lttng_live_component_destroy_data(lttng_live);
1143 return ret;
1144 }
1145
1146 BT_HIDDEN
1147 enum bt_component_status lttng_live_accept_port_connection(
1148 struct bt_private_component *private_component,
1149 struct bt_private_port *self_private_port,
1150 struct bt_port *other_port)
1151 {
1152 struct lttng_live_component *lttng_live =
1153 bt_private_component_get_user_data(private_component);
1154 struct bt_component *other_component;
1155 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1156 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1157
1158 other_component = bt_port_get_component(other_port);
1159 bt_put(other_component); /* weak */
1160
1161 if (!lttng_live->downstream_component) {
1162 lttng_live->downstream_component = other_component;
1163 goto end;
1164 }
1165
1166 /*
1167 * Compare prior component to ensure we are connected to the
1168 * same downstream component as prior ports.
1169 */
1170 if (lttng_live->downstream_component != other_component) {
1171 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1172 bt_port_get_name(self_port),
1173 bt_component_get_name(other_component),
1174 bt_component_get_name(lttng_live->downstream_component));
1175 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1176 goto end;
1177 }
1178 end:
1179 bt_put(self_port);
1180 return status;
1181 }
This page took 0.086186 seconds and 3 git commands to generate.