lib/ctf-ir/utils.c: add logging
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
7cdc2bab 30#include <babeltrace/ctf-ir/packet.h>
b2e0c907 31#include <babeltrace/graph/component-source.h>
7cdc2bab
MD
32#include <babeltrace/graph/private-port.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component.h>
35#include <babeltrace/graph/private-component-source.h>
36#include <babeltrace/graph/private-notification-iterator.h>
37#include <babeltrace/graph/notification-stream.h>
38#include <babeltrace/graph/notification-packet.h>
39#include <babeltrace/graph/notification-event.h>
40#include <babeltrace/graph/notification-heap.h>
41#include <babeltrace/graph/notification-iterator.h>
42#include <babeltrace/graph/notification-inactivity.h>
43#include <babeltrace/compiler-internal.h>
44#include <inttypes.h>
45#include <glib.h>
46#include <assert.h>
47#include <unistd.h>
7d61fa8e 48#include <plugins-common.h>
f3bc2010 49
087bc060 50#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE"
047358f9 51#define BT_LOGLEVEL_NAME "BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_LOG_LEVEL"
087bc060 52
7cdc2bab
MD
53#include "data-stream.h"
54#include "metadata.h"
0f5e83e5 55#include "lttng-live-internal.h"
7cdc2bab 56
7cdc2bab 57#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 58
087bc060 59#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
60
61static const char *print_state(struct lttng_live_stream_iterator *s)
62{
63 switch (s->state) {
64 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
65 return "ACTIVE_NO_DATA";
66 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
67 return "QUIESCENT_NO_DATA";
68 case LTTNG_LIVE_STREAM_QUIESCENT:
69 return "QUIESCENT";
70 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
71 return "ACTIVE_DATA";
72 case LTTNG_LIVE_STREAM_EOF:
73 return "EOF";
74 default:
75 return "ERROR";
76 }
77}
7cdc2bab
MD
78
79#define print_stream_state(stream) \
80 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, \
81 bt_port_get_name(bt_port_from_private_port(stream->port)), \
82 print_state(stream), stream->last_returned_inactivity_timestamp, \
83 stream->current_inactivity_timestamp)
84
d3e4dcd8 85BT_HIDDEN
087bc060 86int bt_lttng_live_log_level = BT_LOG_NONE;
7cdc2bab
MD
87
88BT_HIDDEN
89int lttng_live_add_port(struct lttng_live_component *lttng_live,
90 struct lttng_live_stream_iterator *stream_iter)
91{
92 int ret;
93 struct bt_private_port *private_port;
94 char name[STREAM_NAME_MAX_LEN];
95
96 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
97 assert(ret > 0);
98 strcpy(stream_iter->name, name);
99 private_port = bt_private_component_source_add_output_private_port(
100 lttng_live->private_component, name, stream_iter);
101 if (!private_port) {
102 return -1;
103 }
087bc060 104 BT_LOGI("Added port %s", name);
7cdc2bab
MD
105
106 if (lttng_live->no_stream_port) {
107 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
108 if (ret) {
109 return -1;
110 }
111 BT_PUT(lttng_live->no_stream_port);
112 lttng_live->no_stream_iter->port = NULL;
113 }
114 stream_iter->port = private_port;
115 return 0;
116}
117
118BT_HIDDEN
119int lttng_live_remove_port(struct lttng_live_component *lttng_live,
120 struct bt_private_port *port)
121{
122 struct bt_component *component;
123 int64_t nr_ports;
124 int ret;
125
126 component = bt_component_from_private_component(lttng_live->private_component);
127 nr_ports = bt_component_source_get_output_port_count(component);
128 if (nr_ports < 0) {
129 return -1;
130 }
131 BT_PUT(component);
132 if (nr_ports == 1) {
133 assert(!lttng_live->no_stream_port);
134 lttng_live->no_stream_port =
135 bt_private_component_source_add_output_private_port(lttng_live->private_component,
136 "no-stream", lttng_live->no_stream_iter);
137 if (!lttng_live->no_stream_port) {
138 return -1;
139 }
140 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
141 }
142 ret = bt_private_port_remove_from_component(port);
143 if (ret) {
144 return -1;
145 }
146 return 0;
147}
148
149static
150struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
151 uint64_t trace_id)
d3e4dcd8 152{
7cdc2bab
MD
153 struct lttng_live_trace *trace;
154
155 bt_list_for_each_entry(trace, &session->traces, node) {
156 if (trace->id == trace_id) {
157 return trace;
158 }
159 }
d3eb6e8f
PP
160 return NULL;
161}
162
7cdc2bab
MD
163static
164void lttng_live_destroy_trace(struct bt_object *obj)
165{
166 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
167
087bc060 168 BT_LOGI("Destroy trace");
7cdc2bab
MD
169 assert(bt_list_empty(&trace->streams));
170 bt_list_del(&trace->node);
171 lttng_live_metadata_fini(trace);
172 BT_PUT(trace->cc_prio_map);
173 g_free(trace);
174}
175
176static
177struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
178 uint64_t trace_id)
179{
180 struct lttng_live_trace *trace = NULL;
181
182 trace = g_new0(struct lttng_live_trace, 1);
183 if (!trace) {
184 goto error;
185 }
186 trace->session = session;
187 trace->id = trace_id;
188 BT_INIT_LIST_HEAD(&trace->streams);
189 trace->new_metadata_needed = true;
190 bt_list_add(&trace->node, &session->traces);
191 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 192 BT_LOGI("Create trace");
7cdc2bab
MD
193 goto end;
194error:
195 g_free(trace);
196 trace = NULL;
197end:
198 return trace;
199}
200
201BT_HIDDEN
202struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
203 uint64_t trace_id)
204{
205 struct lttng_live_trace *trace;
206
207 trace = lttng_live_find_trace(session, trace_id);
208 if (trace) {
209 bt_get(trace);
210 return trace;
211 }
212 return lttng_live_create_trace(session, trace_id);
213}
214
215BT_HIDDEN
216void lttng_live_unref_trace(struct lttng_live_trace *trace)
217{
218 bt_put(trace);
219}
220
221static
222void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
223{
224 struct lttng_live_stream_iterator *stream, *s;
225
226 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
227 lttng_live_stream_iterator_destroy(stream);
228 }
229 lttng_live_metadata_fini(trace);
230}
231
232BT_HIDDEN
233int lttng_live_add_session(struct lttng_live_component *lttng_live, uint64_t session_id)
234{
235 int ret = 0;
236 struct lttng_live_session *s;
237
238 s = g_new0(struct lttng_live_session, 1);
239 if (!s) {
240 goto error;
241 }
242
243 s->id = session_id;
244 BT_INIT_LIST_HEAD(&s->traces);
245 s->lttng_live = lttng_live;
246 s->new_streams_needed = true;
247
087bc060 248 BT_LOGI("Reading from session %" PRIu64, s->id);
7cdc2bab
MD
249 bt_list_add(&s->node, &lttng_live->sessions);
250 goto end;
251error:
087bc060 252 BT_LOGE("Error adding session");
7cdc2bab
MD
253 g_free(s);
254 ret = -1;
255end:
256 return ret;
257}
258
259static
260void lttng_live_destroy_session(struct lttng_live_session *session)
261{
262 struct lttng_live_trace *trace, *t;
263
087bc060 264 BT_LOGI("Destroy session");
7cdc2bab
MD
265 if (session->id != -1ULL) {
266 if (lttng_live_detach_session(session)) {
267 /* Old relayd cannot detach sessions. */
087bc060 268 BT_LOGD("Unable to detach session %" PRIu64,
7cdc2bab
MD
269 session->id);
270 }
271 session->id = -1ULL;
272 }
273 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
274 lttng_live_close_trace_streams(trace);
275 }
276 bt_list_del(&session->node);
277 g_free(session);
278}
279
280BT_HIDDEN
281void lttng_live_iterator_finalize(struct bt_private_notification_iterator *it)
282{
283 struct lttng_live_stream_iterator_generic *s =
284 bt_private_notification_iterator_get_user_data(it);
285
286 switch (s->type) {
287 case LIVE_STREAM_TYPE_NO_STREAM:
288 {
289 /* Leave no_stream_iter in place when port is removed. */
290 break;
291 }
292 case LIVE_STREAM_TYPE_STREAM:
293 {
294 struct lttng_live_stream_iterator *stream_iter =
295 container_of(s, struct lttng_live_stream_iterator, p);
296
297 lttng_live_stream_iterator_destroy(stream_iter);
298 break;
299 }
300 }
301}
302
303static
304enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
305 struct lttng_live_component *lttng_live,
306 struct lttng_live_stream_iterator *lttng_live_stream)
307{
308 switch (lttng_live_stream->state) {
309 case LTTNG_LIVE_STREAM_QUIESCENT:
310 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
311 break;
312 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
313 /* Invalid state. */
087bc060
MD
314 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
315 abort();
7cdc2bab
MD
316 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
317 /* Invalid state. */
087bc060
MD
318 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
319 abort();
7cdc2bab
MD
320 case LTTNG_LIVE_STREAM_EOF:
321 break;
322 }
323 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
324}
325
326/*
327 * For active no data stream, fetch next data. It can be either:
328 * - quiescent: need to put it in the prio heap at quiescent end
329 * timestamp,
330 * - have data: need to wire up first event into the prio heap,
331 * - have no data on this stream at this point: need to retry (AGAIN) or
332 * return EOF.
333 */
334static
335enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
336 struct lttng_live_component *lttng_live,
337 struct lttng_live_stream_iterator *lttng_live_stream)
338{
339 enum bt_ctf_lttng_live_iterator_status ret =
340 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
341 struct packet_index index;
342 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
343
344 if (lttng_live_stream->trace->new_metadata_needed) {
345 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
346 goto end;
347 }
348 if (lttng_live_stream->trace->session->new_streams_needed) {
349 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
350 goto end;
351 }
352 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
353 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
354 goto end;
355 }
356 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
357 if (ret != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
358 goto end;
359 }
360 assert(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
361 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
362 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
363 && lttng_live_stream->last_returned_inactivity_timestamp ==
364 lttng_live_stream->current_inactivity_timestamp) {
365 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
366 print_stream_state(lttng_live_stream);
367 } else {
368 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
369 }
370 goto end;
371 }
372 lttng_live_stream->base_offset = index.offset;
373 lttng_live_stream->offset = index.offset;
374 lttng_live_stream->len = index.packet_size / CHAR_BIT;
375end:
376 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
377 ret = lttng_live_iterator_next_check_stream_state(
378 lttng_live, lttng_live_stream);
379 }
380 return ret;
381}
382
383/*
384 * Creation of the notification requires the ctf trace to be created
385 * beforehand, but the live protocol gives us all streams (including
386 * metadata) at once. So we split it in three steps: getting streams,
387 * getting metadata (which creates the ctf trace), and then creating the
388 * per-stream notifications.
389 */
390static
391enum bt_ctf_lttng_live_iterator_status lttng_live_get_session(
392 struct lttng_live_component *lttng_live,
393 struct lttng_live_session *session)
394{
395 enum bt_ctf_lttng_live_iterator_status status;
396 struct lttng_live_trace *trace, *t;
397
398 if (lttng_live_attach_session(session)) {
399 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
400 }
401 status = lttng_live_get_new_streams(session);
402 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK &&
403 status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
404 return status;
405 }
406 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
407 status = lttng_live_metadata_update(trace);
408 if (status == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END) {
409 int retval;
410
411 retval = bt_ctf_trace_set_is_static(trace->trace);
412 assert(!retval);
413 } else if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
414 return status;
415 }
416 }
417 return lttng_live_lazy_notif_init(session);
418}
419
420BT_HIDDEN
421void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
422{
423 struct lttng_live_session *session;
424
425 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
426 session->new_streams_needed = true;
427 }
428}
429
430static
431void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
432{
433 struct lttng_live_session *session;
434
435 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
436 struct lttng_live_trace *trace;
437
438 session->new_streams_needed = true;
439 bt_list_for_each_entry(trace, &session->traces, node) {
440 trace->new_metadata_needed = true;
441 }
442 }
443}
444
445static
446enum bt_notification_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
447 struct lttng_live_component *lttng_live)
448{
449 enum bt_ctf_lttng_live_iterator_status ret =
450 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
451 unsigned int nr_sessions_opened = 0;
452 struct lttng_live_session *session, *s;
453
454 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
455 if (session->closed && bt_list_empty(&session->traces)) {
456 lttng_live_destroy_session(session);
457 }
458 }
459 /*
460 * Currently, when there are no sessions, we quit immediately.
461 * We may want to add a component parameter to keep trying until
462 * we get data in the future.
463 * Also, in a remotely distant future, we could add a "new
464 * session" flag to the protocol, which would tell us that we
465 * need to query for new sessions even though we have sessions
466 * currently ongoing.
467 */
468 if (bt_list_empty(&lttng_live->sessions)) {
469 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
470 goto end;
471 }
472 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
473 ret = lttng_live_get_session(lttng_live, session);
474 switch (ret) {
475 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
476 break;
477 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
478 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
479 break;
480 default:
481 goto end;
482 }
483 if (!session->closed) {
484 nr_sessions_opened++;
485 }
486 }
487end:
488 if (ret == BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
489 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
490 }
491 return ret;
492}
493
494static
495enum bt_ctf_lttng_live_iterator_status emit_inactivity_notification(
496 struct lttng_live_component *lttng_live,
497 struct lttng_live_stream_iterator *lttng_live_stream,
498 struct bt_notification **notification,
499 uint64_t timestamp)
500{
501 enum bt_ctf_lttng_live_iterator_status ret =
502 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
503 struct lttng_live_trace *trace;
504 struct bt_ctf_clock_class *clock_class = NULL;
505 struct bt_ctf_clock_value *clock_value = NULL;
506 struct bt_notification *notif = NULL;
507 int retval;
508
509 trace = lttng_live_stream->trace;
510 if (!trace) {
511 goto error;
512 }
513 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
514 if (!clock_class) {
515 goto error;
516 }
517 clock_value = bt_ctf_clock_value_create(clock_class, timestamp);
518 if (!clock_value) {
519 goto error;
520 }
521 notif = bt_notification_inactivity_create(trace->cc_prio_map);
522 if (!notif) {
523 goto error;
524 }
525 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
526 if (retval) {
527 goto error;
528 }
529 *notification = notif;
530end:
531 bt_put(clock_value);
532 bt_put(clock_class);
533 return ret;
534
535error:
536 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
537 bt_put(notif);
538 goto end;
539}
540
541static
542enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
543 struct lttng_live_component *lttng_live,
544 struct lttng_live_stream_iterator *lttng_live_stream,
545 struct bt_notification **notification)
546{
547 enum bt_ctf_lttng_live_iterator_status ret =
548 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
549 struct bt_ctf_clock_class *clock_class = NULL;
550 struct bt_ctf_clock_value *clock_value = NULL;
551
552 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
553 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
554 }
555
556 if (lttng_live_stream->current_inactivity_timestamp ==
557 lttng_live_stream->last_returned_inactivity_timestamp) {
558 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
559 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
560 goto end;
561 }
562
563 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
564 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
565
566 lttng_live_stream->last_returned_inactivity_timestamp =
567 lttng_live_stream->current_inactivity_timestamp;
568end:
569 bt_put(clock_value);
570 bt_put(clock_class);
571 return ret;
572}
573
574static
575enum bt_ctf_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
576 struct lttng_live_component *lttng_live,
577 struct lttng_live_stream_iterator *lttng_live_stream,
578 struct bt_notification **notification)
579{
580 enum bt_ctf_lttng_live_iterator_status ret =
581 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
582 enum bt_ctf_notif_iter_status status;
583 struct lttng_live_session *session;
584
585 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
586 struct lttng_live_trace *trace;
587
588 if (session->new_streams_needed) {
589 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
590 }
591 bt_list_for_each_entry(trace, &session->traces, node) {
592 if (trace->new_metadata_needed) {
593 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
594 }
595 }
596 }
597
598 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
599 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
600 }
601 if (lttng_live_stream->packet_end_notif_queue) {
602 *notification = lttng_live_stream->packet_end_notif_queue;
603 lttng_live_stream->packet_end_notif_queue = NULL;
604 status = BT_CTF_NOTIF_ITER_STATUS_OK;
605 } else {
606 status = bt_ctf_notif_iter_get_next_notification(
607 lttng_live_stream->notif_iter,
608 lttng_live_stream->trace->cc_prio_map,
609 notification);
610 if (status == BT_CTF_NOTIF_ITER_STATUS_OK) {
611 /*
612 * Consider empty packets as inactivity.
613 */
614 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
615 lttng_live_stream->packet_end_notif_queue = *notification;
616 *notification = NULL;
617 return emit_inactivity_notification(lttng_live,
618 lttng_live_stream, notification,
619 lttng_live_stream->current_packet_end_timestamp);
620 }
621 }
622 }
623 switch (status) {
624 case BT_CTF_NOTIF_ITER_STATUS_EOF:
625 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
626 break;
627 case BT_CTF_NOTIF_ITER_STATUS_OK:
628 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
629 break;
630 case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
631 /*
632 * Continue immediately (end of packet). The next
633 * get_index may return AGAIN to delay the following
634 * attempt.
635 */
636 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
637 break;
638 case BT_CTF_NOTIF_ITER_STATUS_INVAL:
639 /* No argument provided by the user, so don't return INVAL. */
640 case BT_CTF_NOTIF_ITER_STATUS_ERROR:
641 default:
642 ret = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
643 break;
644 }
645 return ret;
646}
647
648/*
649 * helper function:
650 * handle_no_data_streams()
651 * retry:
652 * - for each ACTIVE_NO_DATA stream:
653 * - query relayd for stream data, or quiescence info.
654 * - if need metadata, get metadata, goto retry.
655 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
656 * - if quiescent, move to QUIESCENT streams
657 * - if fetched data, move to ACTIVE_DATA streams
658 * (at this point each stream either has data, or is quiescent)
659 *
660 *
661 * iterator_next:
662 * handle_new_streams_and_metadata()
663 * - query relayd for known streams, add them as ACTIVE_NO_DATA
664 * - query relayd for metadata
665 *
666 * call handle_active_no_data_streams()
667 *
668 * handle_quiescent_streams()
669 * - if at least one stream is ACTIVE_DATA:
670 * - peek stream event with lowest timestamp -> next_ts
671 * - for each quiescent stream
672 * - if next_ts >= quiescent end
673 * - set state to ACTIVE_NO_DATA
674 * - else
675 * - for each quiescent stream
676 * - set state to ACTIVE_NO_DATA
677 *
678 * call handle_active_no_data_streams()
679 *
680 * handle_active_data_streams()
681 * - if at least one stream is ACTIVE_DATA:
682 * - get stream event with lowest timestamp from heap
683 * - make that stream event the current notification.
684 * - move this stream heap position to its next event
685 * - if we need to fetch data from relayd, move
686 * stream to ACTIVE_NO_DATA.
687 * - return OK
688 * - return AGAIN
689 *
690 * end criterion: ctrl-c on client. If relayd exits or the session
691 * closes on the relay daemon side, we keep on waiting for streams.
692 * Eventually handle --end timestamp (also an end criterion).
693 *
694 * When disconnected from relayd: try to re-connect endlessly.
695 */
696static
697struct bt_notification_iterator_next_return lttng_live_iterator_next_stream(
698 struct bt_private_notification_iterator *iterator,
699 struct lttng_live_stream_iterator *stream_iter)
700{
701 enum bt_ctf_lttng_live_iterator_status status;
702 struct bt_notification_iterator_next_return next_return;
703 struct lttng_live_component *lttng_live;
704
705 lttng_live = stream_iter->trace->session->lttng_live;
706retry:
707 print_stream_state(stream_iter);
708 next_return.notification = NULL;
709 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
710 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
711 goto end;
712 }
713 status = lttng_live_iterator_next_handle_one_no_data_stream(
714 lttng_live, stream_iter);
715 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
716 goto end;
717 }
718 status = lttng_live_iterator_next_handle_one_quiescent_stream(
719 lttng_live, stream_iter, &next_return.notification);
720 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
721 assert(next_return.notification == NULL);
722 goto end;
723 }
724 if (next_return.notification) {
725 goto end;
726 }
727 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
728 stream_iter, &next_return.notification);
729 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
730 assert(next_return.notification == NULL);
731 }
732
733end:
734 switch (status) {
735 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
736 print_dbg("continue");
737 goto retry;
738 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
739 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
740 print_dbg("again");
741 break;
742 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
743 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
744 print_dbg("end");
745 break;
746 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
747 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
748 print_dbg("ok");
749 break;
750 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
751 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
752 break;
753 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
754 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
755 break;
756 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
757 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
758 break;
759 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
760 default: /* fall-through */
761 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
762 break;
763 }
764 return next_return;
765}
766
767static
768struct bt_notification_iterator_next_return lttng_live_iterator_next_no_stream(
769 struct bt_private_notification_iterator *iterator,
770 struct lttng_live_no_stream_iterator *no_stream_iter)
771{
772 enum bt_ctf_lttng_live_iterator_status status;
773 struct bt_notification_iterator_next_return next_return;
774 struct lttng_live_component *lttng_live;
775
776 lttng_live = no_stream_iter->lttng_live;
777retry:
778 lttng_live_force_new_streams_and_metadata(lttng_live);
779 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
780 if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK) {
781 goto end;
782 }
783 if (no_stream_iter->port) {
784 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
785 } else {
786 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
787 }
788end:
789 switch (status) {
790 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
791 goto retry;
792 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
793 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
794 break;
795 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END:
796 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
797 break;
798 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK:
799 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
800 break;
801 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
802 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
803 break;
804 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
805 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
806 break;
807 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
808 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
809 break;
810 case BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
811 default: /* fall-through */
812 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
813 break;
814 }
815 return next_return;
816}
817
d3eb6e8f 818BT_HIDDEN
41a2b7ae 819struct bt_notification_iterator_next_return lttng_live_iterator_next(
890882ef 820 struct bt_private_notification_iterator *iterator)
d3eb6e8f 821{
7cdc2bab
MD
822 struct lttng_live_stream_iterator_generic *s =
823 bt_private_notification_iterator_get_user_data(iterator);
824 struct bt_notification_iterator_next_return next_return;
825
826 switch (s->type) {
827 case LIVE_STREAM_TYPE_NO_STREAM:
828 next_return = lttng_live_iterator_next_no_stream(iterator,
829 container_of(s, struct lttng_live_no_stream_iterator, p));
830 break;
831 case LIVE_STREAM_TYPE_STREAM:
832 next_return = lttng_live_iterator_next_stream(iterator,
833 container_of(s, struct lttng_live_stream_iterator, p));
834 break;
835 default:
836 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
837 break;
838 }
839 return next_return;
840}
41a2b7ae 841
7cdc2bab
MD
842BT_HIDDEN
843enum bt_notification_iterator_status lttng_live_iterator_init(
844 struct bt_private_notification_iterator *it,
845 struct bt_private_port *port)
846{
847 enum bt_notification_iterator_status ret =
848 BT_NOTIFICATION_ITERATOR_STATUS_OK;
849 struct lttng_live_stream_iterator_generic *s;
7cdc2bab
MD
850
851 assert(it);
852
853 s = bt_private_port_get_user_data(port);
854 assert(s);
855 switch (s->type) {
856 case LIVE_STREAM_TYPE_NO_STREAM:
857 {
858 struct lttng_live_no_stream_iterator *no_stream_iter =
859 container_of(s, struct lttng_live_no_stream_iterator, p);
7cdc2bab
MD
860 ret = bt_private_notification_iterator_set_user_data(it, no_stream_iter);
861 if (ret) {
862 goto error;
863 }
864 break;
865 }
866 case LIVE_STREAM_TYPE_STREAM:
867 {
868 struct lttng_live_stream_iterator *stream_iter =
869 container_of(s, struct lttng_live_stream_iterator, p);
7cdc2bab
MD
870 ret = bt_private_notification_iterator_set_user_data(it, stream_iter);
871 if (ret) {
872 goto error;
873 }
874 break;
875 }
876 default:
877 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
878 goto end;
879 }
880
881end:
41a2b7ae 882 return ret;
7cdc2bab
MD
883error:
884 if (bt_private_notification_iterator_set_user_data(it, NULL)
885 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 886 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
887 }
888 goto end;
889}
890
891static
892struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_class,
893 struct bt_value *params)
894{
895 struct bt_value *url_value = NULL;
896 struct bt_value *results = NULL;
897 const char *url;
898 struct bt_live_viewer_connection *viewer_connection = NULL;
899 enum bt_value_status ret;
900
901 url_value = bt_value_map_get(params, "url");
902 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 903 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
904 goto error;
905 }
906
907 ret = bt_value_string_get(url_value, &url);
908 if (ret != BT_VALUE_STATUS_OK) {
087bc060 909 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
910 goto error;
911 }
912
913 viewer_connection = bt_live_viewer_connection_create(url, stderr);
914 if (!viewer_connection) {
915 ret = BT_COMPONENT_STATUS_NOMEM;
916 goto error;
917 }
918
919 results = bt_live_viewer_connection_list_sessions(viewer_connection);
920 goto end;
921error:
922 BT_PUT(results);
923end:
924 if (viewer_connection) {
925 bt_live_viewer_connection_destroy(viewer_connection);
926 }
927 BT_PUT(url_value);
928 return results;
929}
930
931BT_HIDDEN
932struct bt_value *lttng_live_query(struct bt_component_class *comp_class,
933 const char *object, struct bt_value *params)
934{
935 if (strcmp(object, "sessions") == 0) {
936 return lttng_live_query_list_sessions(comp_class,
937 params);
938 }
087bc060 939 BT_LOGW("Unknown query object `%s`", object);
7cdc2bab
MD
940 return NULL;
941}
942
943static
944void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
945{
946 int ret;
947 struct lttng_live_session *session, *s;
948
949 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
950 lttng_live_destroy_session(session);
951 }
952 BT_PUT(lttng_live->viewer_connection);
953 if (lttng_live->url) {
954 g_string_free(lttng_live->url, TRUE);
955 }
956 if (lttng_live->no_stream_port) {
957 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
958 assert(!ret);
959 BT_PUT(lttng_live->no_stream_port);
960 }
961 if (lttng_live->no_stream_iter) {
962 g_free(lttng_live->no_stream_iter);
963 }
964 g_free(lttng_live);
965}
966
967BT_HIDDEN
968void lttng_live_component_finalize(struct bt_private_component *component)
969{
970 void *data = bt_private_component_get_user_data(component);
971
972 if (!data) {
973 return;
974 }
975 lttng_live_component_destroy_data(data);
976}
977
978static
979struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
980 struct bt_private_component *private_component)
981{
982 struct lttng_live_component *lttng_live;
983 struct bt_value *value = NULL;
984 const char *url;
985 enum bt_value_status ret;
986
987 lttng_live = g_new0(struct lttng_live_component, 1);
988 if (!lttng_live) {
989 goto end;
990 }
7cdc2bab
MD
991 /* TODO: make this an overridable parameter. */
992 lttng_live->max_query_size = MAX_QUERY_SIZE;
993 BT_INIT_LIST_HEAD(&lttng_live->sessions);
994 value = bt_value_map_get(params, "url");
995 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 996 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
997 goto error;
998 }
999 ret = bt_value_string_get(value, &url);
1000 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1001 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1002 goto error;
1003 }
1004 lttng_live->url = g_string_new(url);
1005 if (!lttng_live->url) {
1006 goto error;
1007 }
1008 lttng_live->viewer_connection =
1009 bt_live_viewer_connection_create(lttng_live->url->str,
1010 stderr);
1011 if (!lttng_live->viewer_connection) {
1012 ret = BT_COMPONENT_STATUS_NOMEM;
1013 goto error;
1014 }
1015 if (lttng_live_create_viewer_session(lttng_live)) {
1016 ret = BT_COMPONENT_STATUS_ERROR;
1017 goto error;
1018 }
1019 lttng_live->private_component = private_component;
1020
1021 goto end;
1022
1023error:
1024 lttng_live_component_destroy_data(lttng_live);
1025 lttng_live = NULL;
1026end:
1027 return lttng_live;
d3e4dcd8
PP
1028}
1029
f3bc2010 1030BT_HIDDEN
7cdc2bab
MD
1031enum bt_component_status lttng_live_component_init(struct bt_private_component *component,
1032 struct bt_value *params, void *init_method_data)
f3bc2010 1033{
7cdc2bab
MD
1034 struct lttng_live_component *lttng_live;
1035 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1036
7cdc2bab
MD
1037 /* Passes ownership of iter ref to lttng_live_component_create. */
1038 lttng_live = lttng_live_component_create(params, component);
1039 if (!lttng_live) {
1040 ret = BT_COMPONENT_STATUS_NOMEM;
1041 goto end;
1042 }
1043
1044 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1045 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1046 lttng_live->no_stream_iter->lttng_live = lttng_live;
1047
1048 lttng_live->no_stream_port =
1049 bt_private_component_source_add_output_private_port(
1050 lttng_live->private_component, "no-stream",
1051 lttng_live->no_stream_iter);
1052 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1053
1054 ret = bt_private_component_set_user_data(component, lttng_live);
1055 if (ret != BT_COMPONENT_STATUS_OK) {
1056 goto error;
1057 }
1058
1059end:
1060 return ret;
1061error:
1062 (void) bt_private_component_set_user_data(component, NULL);
1063 lttng_live_component_destroy_data(lttng_live);
1064 return ret;
f3bc2010 1065}
087bc060 1066
d85ef162
MD
1067BT_HIDDEN
1068enum bt_component_status lttng_live_accept_port_connection(
1069 struct bt_private_component *private_component,
1070 struct bt_private_port *self_private_port,
1071 struct bt_port *other_port)
1072{
1073 struct lttng_live_component *lttng_live =
1074 bt_private_component_get_user_data(private_component);
1075 struct bt_component *other_component;
1076 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
1077 struct bt_port *self_port = bt_port_from_private_port(self_private_port);
1078
1079 other_component = bt_port_get_component(other_port);
1080 bt_put(other_component); /* weak */
1081
1082 if (!lttng_live->downstream_component) {
1083 lttng_live->downstream_component = other_component;
1084 goto end;
1085 }
1086
1087 /*
1088 * Compare prior component to ensure we are connected to the
1089 * same downstream component as prior ports.
1090 */
1091 if (lttng_live->downstream_component != other_component) {
1092 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1093 bt_port_get_name(self_port),
1094 bt_component_get_name(other_component),
1095 bt_component_get_name(lttng_live->downstream_component));
1096 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1097 goto end;
1098 }
1099end:
1100 bt_put(self_port);
1101 return status;
1102}
1103
087bc060
MD
1104static
1105void __attribute__((constructor)) bt_lttng_live_logging_ctor(void)
1106{
1107 enum bt_logging_level log_level = BT_LOG_NONE;
047358f9 1108 const char *log_level_env = getenv(BT_LOGLEVEL_NAME);
087bc060
MD
1109
1110 if (!log_level_env) {
1111 return;
1112 }
1113
1114 if (strcmp(log_level_env, "VERBOSE") == 0) {
1115 log_level = BT_LOGGING_LEVEL_VERBOSE;
1116 } else if (strcmp(log_level_env, "DEBUG") == 0) {
1117 log_level = BT_LOGGING_LEVEL_DEBUG;
1118 } else if (strcmp(log_level_env, "INFO") == 0) {
1119 log_level = BT_LOGGING_LEVEL_INFO;
1120 } else if (strcmp(log_level_env, "WARN") == 0) {
1121 log_level = BT_LOGGING_LEVEL_WARN;
1122 } else if (strcmp(log_level_env, "ERROR") == 0) {
1123 log_level = BT_LOGGING_LEVEL_ERROR;
1124 } else if (strcmp(log_level_env, "FATAL") == 0) {
1125 log_level = BT_LOGGING_LEVEL_FATAL;
d85ef162
MD
1126 } else {
1127 bt_lttng_live_log_level = BT_LOGGING_LEVEL_FATAL;
047358f9
MD
1128 BT_LOGF("Incorrect log level specified in %s",
1129 BT_LOGLEVEL_NAME);
d85ef162 1130 abort();
087bc060
MD
1131 }
1132
1133 bt_lttng_live_log_level = log_level;
1134}
This page took 0.078678 seconds and 4 git commands to generate.