Replace assert() -> BT_ASSERT() and some preconditions with BT_ASSERT_PRE()
[babeltrace.git] / plugins / ctf / lttng-live / lttng-live.c
CommitLineData
f3bc2010
JG
1/*
2 * lttng-live.c
3 *
4 * Babeltrace CTF LTTng-live Client Component
5 *
6 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7cdc2bab 7 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
f3bc2010
JG
8 *
9 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
020bc26f
PP
30#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC"
31#include "logging.h"
32
9d408fca 33#include <babeltrace/babeltrace.h>
7cdc2bab 34#include <babeltrace/compiler-internal.h>
6f79a7cf 35#include <babeltrace/types.h>
7cdc2bab
MD
36#include <inttypes.h>
37#include <glib.h>
f6ccaed9 38#include <babeltrace/assert-internal.h>
7cdc2bab 39#include <unistd.h>
7d61fa8e 40#include <plugins-common.h>
f3bc2010 41
7cdc2bab
MD
42#include "data-stream.h"
43#include "metadata.h"
0f5e83e5 44#include "lttng-live-internal.h"
7cdc2bab 45
7cdc2bab 46#define MAX_QUERY_SIZE (256*1024)
7cdc2bab 47
087bc060 48#define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__)
7cdc2bab
MD
49
50static const char *print_state(struct lttng_live_stream_iterator *s)
51{
52 switch (s->state) {
53 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
54 return "ACTIVE_NO_DATA";
55 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
56 return "QUIESCENT_NO_DATA";
57 case LTTNG_LIVE_STREAM_QUIESCENT:
58 return "QUIESCENT";
59 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
60 return "ACTIVE_DATA";
61 case LTTNG_LIVE_STREAM_EOF:
62 return "EOF";
63 default:
64 return "ERROR";
65 }
66}
7cdc2bab 67
6f79a7cf
MD
68static
69void print_stream_state(struct lttng_live_stream_iterator *stream)
70{
71 struct bt_port *port;
72
6d137876 73 port = bt_port_from_private(stream->port);
6f79a7cf
MD
74 print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64,
75 bt_port_get_name(port),
76 print_state(stream),
77 stream->last_returned_inactivity_timestamp,
78 stream->current_inactivity_timestamp);
79 bt_put(port);
80}
81
82BT_HIDDEN
83bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live)
84{
85 struct bt_component *component;
86 struct bt_graph *graph;
87 bt_bool ret;
88
89 if (!lttng_live) {
90 return BT_FALSE;
91 }
92
6d137876 93 component = bt_component_from_private(lttng_live->private_component);
6f79a7cf
MD
94 graph = bt_component_get_graph(component);
95 ret = bt_graph_is_canceled(graph);
96 bt_put(graph);
97 bt_put(component);
98 return ret;
99}
7cdc2bab 100
7cdc2bab
MD
101BT_HIDDEN
102int lttng_live_add_port(struct lttng_live_component *lttng_live,
103 struct lttng_live_stream_iterator *stream_iter)
104{
105 int ret;
106 struct bt_private_port *private_port;
107 char name[STREAM_NAME_MAX_LEN];
6f79a7cf 108 enum bt_component_status status;
7cdc2bab
MD
109
110 ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id);
f6ccaed9 111 BT_ASSERT(ret > 0);
7cdc2bab 112 strcpy(stream_iter->name, name);
4bf0e537
MD
113 if (lttng_live_is_canceled(lttng_live)) {
114 return 0;
115 }
6f79a7cf 116 status = bt_private_component_source_add_output_private_port(
147337a3
PP
117 lttng_live->private_component, name, stream_iter,
118 &private_port);
6f79a7cf
MD
119 switch (status) {
120 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
121 return 0;
122 case BT_COMPONENT_STATUS_OK:
123 break;
124 default:
7cdc2bab
MD
125 return -1;
126 }
6f79a7cf 127 bt_put(private_port); /* weak */
087bc060 128 BT_LOGI("Added port %s", name);
7cdc2bab
MD
129
130 if (lttng_live->no_stream_port) {
6f79a7cf 131 bt_get(lttng_live->no_stream_port);
7cdc2bab 132 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 133 bt_put(lttng_live->no_stream_port);
7cdc2bab
MD
134 if (ret) {
135 return -1;
136 }
6f79a7cf 137 lttng_live->no_stream_port = NULL;
7cdc2bab
MD
138 lttng_live->no_stream_iter->port = NULL;
139 }
140 stream_iter->port = private_port;
141 return 0;
142}
143
144BT_HIDDEN
145int lttng_live_remove_port(struct lttng_live_component *lttng_live,
146 struct bt_private_port *port)
147{
148 struct bt_component *component;
149 int64_t nr_ports;
150 int ret;
151
6d137876 152 component = bt_component_from_private(lttng_live->private_component);
7cdc2bab
MD
153 nr_ports = bt_component_source_get_output_port_count(component);
154 if (nr_ports < 0) {
155 return -1;
156 }
157 BT_PUT(component);
158 if (nr_ports == 1) {
6f79a7cf
MD
159 enum bt_component_status status;
160
f6ccaed9 161 BT_ASSERT(!lttng_live->no_stream_port);
4bf0e537
MD
162
163 if (lttng_live_is_canceled(lttng_live)) {
164 return 0;
165 }
6f79a7cf 166 status = bt_private_component_source_add_output_private_port(lttng_live->private_component,
147337a3
PP
167 "no-stream", lttng_live->no_stream_iter,
168 &lttng_live->no_stream_port);
6f79a7cf
MD
169 switch (status) {
170 case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED:
171 return 0;
172 case BT_COMPONENT_STATUS_OK:
173 break;
174 default:
7cdc2bab
MD
175 return -1;
176 }
6f79a7cf 177 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
178 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
179 }
6f79a7cf 180 bt_get(port);
7cdc2bab 181 ret = bt_private_port_remove_from_component(port);
6f79a7cf 182 bt_put(port);
7cdc2bab
MD
183 if (ret) {
184 return -1;
185 }
186 return 0;
187}
188
189static
190struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session,
191 uint64_t trace_id)
d3e4dcd8 192{
7cdc2bab
MD
193 struct lttng_live_trace *trace;
194
195 bt_list_for_each_entry(trace, &session->traces, node) {
196 if (trace->id == trace_id) {
197 return trace;
198 }
199 }
d3eb6e8f
PP
200 return NULL;
201}
202
7cdc2bab
MD
203static
204void lttng_live_destroy_trace(struct bt_object *obj)
205{
206 struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj);
207
087bc060 208 BT_LOGI("Destroy trace");
f6ccaed9 209 BT_ASSERT(bt_list_empty(&trace->streams));
7cdc2bab 210 bt_list_del(&trace->node);
5bd230f4 211
4bf0e537
MD
212 if (trace->trace) {
213 int retval;
5bd230f4 214
50842bdc 215 retval = bt_trace_set_is_static(trace->trace);
f6ccaed9 216 BT_ASSERT(!retval);
4bf0e537
MD
217 BT_PUT(trace->trace);
218 }
7cdc2bab
MD
219 lttng_live_metadata_fini(trace);
220 BT_PUT(trace->cc_prio_map);
221 g_free(trace);
222}
223
224static
225struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session,
226 uint64_t trace_id)
227{
228 struct lttng_live_trace *trace = NULL;
229
230 trace = g_new0(struct lttng_live_trace, 1);
231 if (!trace) {
232 goto error;
233 }
234 trace->session = session;
235 trace->id = trace_id;
236 BT_INIT_LIST_HEAD(&trace->streams);
237 trace->new_metadata_needed = true;
238 bt_list_add(&trace->node, &session->traces);
239 bt_object_init(&trace->obj, lttng_live_destroy_trace);
087bc060 240 BT_LOGI("Create trace");
7cdc2bab
MD
241 goto end;
242error:
243 g_free(trace);
244 trace = NULL;
245end:
246 return trace;
247}
248
249BT_HIDDEN
250struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session,
251 uint64_t trace_id)
252{
253 struct lttng_live_trace *trace;
254
255 trace = lttng_live_find_trace(session, trace_id);
256 if (trace) {
257 bt_get(trace);
258 return trace;
259 }
260 return lttng_live_create_trace(session, trace_id);
261}
262
263BT_HIDDEN
264void lttng_live_unref_trace(struct lttng_live_trace *trace)
265{
266 bt_put(trace);
267}
268
269static
270void lttng_live_close_trace_streams(struct lttng_live_trace *trace)
271{
272 struct lttng_live_stream_iterator *stream, *s;
273
274 bt_list_for_each_entry_safe(stream, s, &trace->streams, node) {
275 lttng_live_stream_iterator_destroy(stream);
276 }
277 lttng_live_metadata_fini(trace);
278}
279
280BT_HIDDEN
06994c71
MD
281int lttng_live_add_session(struct lttng_live_component *lttng_live,
282 uint64_t session_id, const char *hostname,
283 const char *session_name)
7cdc2bab
MD
284{
285 int ret = 0;
286 struct lttng_live_session *s;
287
288 s = g_new0(struct lttng_live_session, 1);
289 if (!s) {
290 goto error;
291 }
292
293 s->id = session_id;
294 BT_INIT_LIST_HEAD(&s->traces);
295 s->lttng_live = lttng_live;
296 s->new_streams_needed = true;
06994c71
MD
297 s->hostname = g_string_new(hostname);
298 s->session_name = g_string_new(session_name);
7cdc2bab 299
06994c71
MD
300 BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s",
301 s->id, hostname, session_name);
7cdc2bab
MD
302 bt_list_add(&s->node, &lttng_live->sessions);
303 goto end;
304error:
087bc060 305 BT_LOGE("Error adding session");
7cdc2bab
MD
306 g_free(s);
307 ret = -1;
308end:
309 return ret;
310}
311
312static
313void lttng_live_destroy_session(struct lttng_live_session *session)
314{
315 struct lttng_live_trace *trace, *t;
316
087bc060 317 BT_LOGI("Destroy session");
7cdc2bab
MD
318 if (session->id != -1ULL) {
319 if (lttng_live_detach_session(session)) {
6f79a7cf 320 if (!lttng_live_is_canceled(session->lttng_live)) {
4c66436f
MD
321 /* Old relayd cannot detach sessions. */
322 BT_LOGD("Unable to detach session %" PRIu64,
323 session->id);
324 }
7cdc2bab
MD
325 }
326 session->id = -1ULL;
327 }
328 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
329 lttng_live_close_trace_streams(trace);
330 }
331 bt_list_del(&session->node);
06994c71
MD
332 if (session->hostname) {
333 g_string_free(session->hostname, TRUE);
334 }
335 if (session->session_name) {
336 g_string_free(session->session_name, TRUE);
337 }
7cdc2bab
MD
338 g_free(session);
339}
340
341BT_HIDDEN
90157d89 342void lttng_live_iterator_finalize(struct bt_private_connection_private_notification_iterator *it)
7cdc2bab
MD
343{
344 struct lttng_live_stream_iterator_generic *s =
90157d89 345 bt_private_connection_private_notification_iterator_get_user_data(it);
7cdc2bab
MD
346
347 switch (s->type) {
348 case LIVE_STREAM_TYPE_NO_STREAM:
349 {
350 /* Leave no_stream_iter in place when port is removed. */
351 break;
352 }
353 case LIVE_STREAM_TYPE_STREAM:
354 {
355 struct lttng_live_stream_iterator *stream_iter =
356 container_of(s, struct lttng_live_stream_iterator, p);
357
358 lttng_live_stream_iterator_destroy(stream_iter);
359 break;
360 }
361 }
362}
363
364static
50842bdc 365enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state(
7cdc2bab
MD
366 struct lttng_live_component *lttng_live,
367 struct lttng_live_stream_iterator *lttng_live_stream)
368{
369 switch (lttng_live_stream->state) {
370 case LTTNG_LIVE_STREAM_QUIESCENT:
371 case LTTNG_LIVE_STREAM_ACTIVE_DATA:
372 break;
373 case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA:
374 /* Invalid state. */
087bc060
MD
375 BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\"");
376 abort();
7cdc2bab
MD
377 case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA:
378 /* Invalid state. */
087bc060
MD
379 BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\"");
380 abort();
7cdc2bab
MD
381 case LTTNG_LIVE_STREAM_EOF:
382 break;
383 }
50842bdc 384 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
385}
386
387/*
388 * For active no data stream, fetch next data. It can be either:
389 * - quiescent: need to put it in the prio heap at quiescent end
390 * timestamp,
391 * - have data: need to wire up first event into the prio heap,
392 * - have no data on this stream at this point: need to retry (AGAIN) or
393 * return EOF.
394 */
395static
50842bdc 396enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream(
7cdc2bab
MD
397 struct lttng_live_component *lttng_live,
398 struct lttng_live_stream_iterator *lttng_live_stream)
399{
50842bdc
PP
400 enum bt_lttng_live_iterator_status ret =
401 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
402 struct packet_index index;
403 enum lttng_live_stream_state orig_state = lttng_live_stream->state;
404
405 if (lttng_live_stream->trace->new_metadata_needed) {
50842bdc 406 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
407 goto end;
408 }
409 if (lttng_live_stream->trace->session->new_streams_needed) {
50842bdc 410 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
411 goto end;
412 }
413 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA
414 && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) {
415 goto end;
416 }
417 ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index);
50842bdc 418 if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
419 goto end;
420 }
f6ccaed9 421 BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF);
7cdc2bab
MD
422 if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) {
423 if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA
424 && lttng_live_stream->last_returned_inactivity_timestamp ==
425 lttng_live_stream->current_inactivity_timestamp) {
50842bdc 426 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
427 print_stream_state(lttng_live_stream);
428 } else {
50842bdc 429 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
430 }
431 goto end;
432 }
433 lttng_live_stream->base_offset = index.offset;
434 lttng_live_stream->offset = index.offset;
435 lttng_live_stream->len = index.packet_size / CHAR_BIT;
436end:
50842bdc 437 if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
438 ret = lttng_live_iterator_next_check_stream_state(
439 lttng_live, lttng_live_stream);
440 }
441 return ret;
442}
443
444/*
445 * Creation of the notification requires the ctf trace to be created
446 * beforehand, but the live protocol gives us all streams (including
447 * metadata) at once. So we split it in three steps: getting streams,
448 * getting metadata (which creates the ctf trace), and then creating the
449 * per-stream notifications.
450 */
451static
50842bdc 452enum bt_lttng_live_iterator_status lttng_live_get_session(
7cdc2bab
MD
453 struct lttng_live_component *lttng_live,
454 struct lttng_live_session *session)
455{
50842bdc 456 enum bt_lttng_live_iterator_status status;
7cdc2bab
MD
457 struct lttng_live_trace *trace, *t;
458
459 if (lttng_live_attach_session(session)) {
6f79a7cf 460 if (lttng_live_is_canceled(lttng_live)) {
50842bdc 461 return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 462 } else {
50842bdc 463 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4c66436f 464 }
7cdc2bab
MD
465 }
466 status = lttng_live_get_new_streams(session);
50842bdc
PP
467 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
468 status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
469 return status;
470 }
471 bt_list_for_each_entry_safe(trace, t, &session->traces, node) {
472 status = lttng_live_metadata_update(trace);
50842bdc
PP
473 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK &&
474 status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) {
7cdc2bab
MD
475 return status;
476 }
477 }
478 return lttng_live_lazy_notif_init(session);
479}
480
481BT_HIDDEN
482void lttng_live_need_new_streams(struct lttng_live_component *lttng_live)
483{
484 struct lttng_live_session *session;
485
486 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
487 session->new_streams_needed = true;
488 }
489}
490
491static
492void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live)
493{
494 struct lttng_live_session *session;
495
496 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
497 struct lttng_live_trace *trace;
498
499 session->new_streams_needed = true;
500 bt_list_for_each_entry(trace, &session->traces, node) {
501 trace->new_metadata_needed = true;
502 }
503 }
504}
505
506static
50842bdc 507enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata(
7cdc2bab
MD
508 struct lttng_live_component *lttng_live)
509{
50842bdc
PP
510 enum bt_lttng_live_iterator_status ret =
511 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
512 unsigned int nr_sessions_opened = 0;
513 struct lttng_live_session *session, *s;
514
515 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
516 if (session->closed && bt_list_empty(&session->traces)) {
517 lttng_live_destroy_session(session);
518 }
519 }
520 /*
521 * Currently, when there are no sessions, we quit immediately.
522 * We may want to add a component parameter to keep trying until
523 * we get data in the future.
524 * Also, in a remotely distant future, we could add a "new
525 * session" flag to the protocol, which would tell us that we
526 * need to query for new sessions even though we have sessions
527 * currently ongoing.
528 */
529 if (bt_list_empty(&lttng_live->sessions)) {
50842bdc 530 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
531 goto end;
532 }
533 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
534 ret = lttng_live_get_session(lttng_live, session);
535 switch (ret) {
50842bdc 536 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab 537 break;
50842bdc
PP
538 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
539 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
540 break;
541 default:
542 goto end;
543 }
544 if (!session->closed) {
545 nr_sessions_opened++;
546 }
547 }
548end:
50842bdc
PP
549 if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) {
550 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
551 }
552 return ret;
553}
554
555static
50842bdc 556enum bt_lttng_live_iterator_status emit_inactivity_notification(
7cdc2bab
MD
557 struct lttng_live_component *lttng_live,
558 struct lttng_live_stream_iterator *lttng_live_stream,
559 struct bt_notification **notification,
560 uint64_t timestamp)
561{
50842bdc
PP
562 enum bt_lttng_live_iterator_status ret =
563 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 564 struct lttng_live_trace *trace;
50842bdc
PP
565 struct bt_clock_class *clock_class = NULL;
566 struct bt_clock_value *clock_value = NULL;
7cdc2bab
MD
567 struct bt_notification *notif = NULL;
568 int retval;
569
570 trace = lttng_live_stream->trace;
571 if (!trace) {
572 goto error;
573 }
574 clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0);
575 if (!clock_class) {
576 goto error;
577 }
50842bdc 578 clock_value = bt_clock_value_create(clock_class, timestamp);
7cdc2bab
MD
579 if (!clock_value) {
580 goto error;
581 }
582 notif = bt_notification_inactivity_create(trace->cc_prio_map);
583 if (!notif) {
584 goto error;
585 }
586 retval = bt_notification_inactivity_set_clock_value(notif, clock_value);
587 if (retval) {
588 goto error;
589 }
590 *notification = notif;
591end:
592 bt_put(clock_value);
593 bt_put(clock_class);
594 return ret;
595
596error:
50842bdc 597 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
7cdc2bab
MD
598 bt_put(notif);
599 goto end;
600}
601
602static
50842bdc 603enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream(
7cdc2bab
MD
604 struct lttng_live_component *lttng_live,
605 struct lttng_live_stream_iterator *lttng_live_stream,
606 struct bt_notification **notification)
607{
50842bdc
PP
608 enum bt_lttng_live_iterator_status ret =
609 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
610 struct bt_clock_class *clock_class = NULL;
611 struct bt_clock_value *clock_value = NULL;
7cdc2bab
MD
612
613 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) {
50842bdc 614 return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
615 }
616
617 if (lttng_live_stream->current_inactivity_timestamp ==
618 lttng_live_stream->last_returned_inactivity_timestamp) {
619 lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA;
50842bdc 620 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
621 goto end;
622 }
623
624 ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification,
625 (uint64_t) lttng_live_stream->current_inactivity_timestamp);
626
627 lttng_live_stream->last_returned_inactivity_timestamp =
628 lttng_live_stream->current_inactivity_timestamp;
629end:
630 bt_put(clock_value);
631 bt_put(clock_class);
632 return ret;
633}
634
635static
50842bdc 636enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream(
7cdc2bab
MD
637 struct lttng_live_component *lttng_live,
638 struct lttng_live_stream_iterator *lttng_live_stream,
639 struct bt_notification **notification)
640{
50842bdc
PP
641 enum bt_lttng_live_iterator_status ret =
642 BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
643 enum bt_notif_iter_status status;
7cdc2bab
MD
644 struct lttng_live_session *session;
645
646 bt_list_for_each_entry(session, &lttng_live->sessions, node) {
647 struct lttng_live_trace *trace;
648
649 if (session->new_streams_needed) {
50842bdc 650 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
651 }
652 bt_list_for_each_entry(trace, &session->traces, node) {
653 if (trace->new_metadata_needed) {
50842bdc 654 return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab
MD
655 }
656 }
657 }
658
659 if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) {
50842bdc 660 return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
7cdc2bab
MD
661 }
662 if (lttng_live_stream->packet_end_notif_queue) {
663 *notification = lttng_live_stream->packet_end_notif_queue;
664 lttng_live_stream->packet_end_notif_queue = NULL;
50842bdc 665 status = BT_NOTIF_ITER_STATUS_OK;
7cdc2bab 666 } else {
50842bdc 667 status = bt_notif_iter_get_next_notification(
7cdc2bab
MD
668 lttng_live_stream->notif_iter,
669 lttng_live_stream->trace->cc_prio_map,
670 notification);
50842bdc 671 if (status == BT_NOTIF_ITER_STATUS_OK) {
7cdc2bab
MD
672 /*
673 * Consider empty packets as inactivity.
674 */
675 if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) {
676 lttng_live_stream->packet_end_notif_queue = *notification;
677 *notification = NULL;
678 return emit_inactivity_notification(lttng_live,
679 lttng_live_stream, notification,
680 lttng_live_stream->current_packet_end_timestamp);
681 }
682 }
683 }
684 switch (status) {
50842bdc
PP
685 case BT_NOTIF_ITER_STATUS_EOF:
686 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 687 break;
50842bdc
PP
688 case BT_NOTIF_ITER_STATUS_OK:
689 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab 690 break;
50842bdc 691 case BT_NOTIF_ITER_STATUS_AGAIN:
7cdc2bab
MD
692 /*
693 * Continue immediately (end of packet). The next
694 * get_index may return AGAIN to delay the following
695 * attempt.
696 */
50842bdc 697 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE;
7cdc2bab 698 break;
50842bdc 699 case BT_NOTIF_ITER_STATUS_INVAL:
7cdc2bab 700 /* No argument provided by the user, so don't return INVAL. */
50842bdc 701 case BT_NOTIF_ITER_STATUS_ERROR:
7cdc2bab 702 default:
50842bdc 703 ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
7cdc2bab
MD
704 break;
705 }
706 return ret;
707}
708
709/*
710 * helper function:
711 * handle_no_data_streams()
712 * retry:
713 * - for each ACTIVE_NO_DATA stream:
714 * - query relayd for stream data, or quiescence info.
715 * - if need metadata, get metadata, goto retry.
716 * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry
717 * - if quiescent, move to QUIESCENT streams
718 * - if fetched data, move to ACTIVE_DATA streams
719 * (at this point each stream either has data, or is quiescent)
720 *
721 *
722 * iterator_next:
723 * handle_new_streams_and_metadata()
724 * - query relayd for known streams, add them as ACTIVE_NO_DATA
725 * - query relayd for metadata
726 *
727 * call handle_active_no_data_streams()
728 *
729 * handle_quiescent_streams()
730 * - if at least one stream is ACTIVE_DATA:
731 * - peek stream event with lowest timestamp -> next_ts
732 * - for each quiescent stream
733 * - if next_ts >= quiescent end
734 * - set state to ACTIVE_NO_DATA
735 * - else
736 * - for each quiescent stream
737 * - set state to ACTIVE_NO_DATA
738 *
739 * call handle_active_no_data_streams()
740 *
741 * handle_active_data_streams()
742 * - if at least one stream is ACTIVE_DATA:
743 * - get stream event with lowest timestamp from heap
744 * - make that stream event the current notification.
745 * - move this stream heap position to its next event
746 * - if we need to fetch data from relayd, move
747 * stream to ACTIVE_NO_DATA.
748 * - return OK
749 * - return AGAIN
750 *
751 * end criterion: ctrl-c on client. If relayd exits or the session
752 * closes on the relay daemon side, we keep on waiting for streams.
753 * Eventually handle --end timestamp (also an end criterion).
754 *
755 * When disconnected from relayd: try to re-connect endlessly.
756 */
757static
90157d89
PP
758struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream(
759 struct bt_private_connection_private_notification_iterator *iterator,
7cdc2bab
MD
760 struct lttng_live_stream_iterator *stream_iter)
761{
50842bdc 762 enum bt_lttng_live_iterator_status status;
90157d89 763 struct bt_notification_iterator_next_method_return next_return;
7cdc2bab
MD
764 struct lttng_live_component *lttng_live;
765
766 lttng_live = stream_iter->trace->session->lttng_live;
767retry:
768 print_stream_state(stream_iter);
769 next_return.notification = NULL;
770 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
50842bdc 771 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
772 goto end;
773 }
774 status = lttng_live_iterator_next_handle_one_no_data_stream(
775 lttng_live, stream_iter);
50842bdc 776 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
777 goto end;
778 }
779 status = lttng_live_iterator_next_handle_one_quiescent_stream(
780 lttng_live, stream_iter, &next_return.notification);
50842bdc 781 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
f6ccaed9 782 BT_ASSERT(next_return.notification == NULL);
7cdc2bab
MD
783 goto end;
784 }
785 if (next_return.notification) {
786 goto end;
787 }
788 status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live,
789 stream_iter, &next_return.notification);
50842bdc 790 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
f6ccaed9 791 BT_ASSERT(next_return.notification == NULL);
7cdc2bab
MD
792 }
793
794end:
795 switch (status) {
50842bdc 796 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
7cdc2bab
MD
797 print_dbg("continue");
798 goto retry;
50842bdc 799 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
7cdc2bab
MD
800 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
801 print_dbg("again");
802 break;
50842bdc 803 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
7cdc2bab
MD
804 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
805 print_dbg("end");
806 break;
50842bdc 807 case BT_LTTNG_LIVE_ITERATOR_STATUS_OK:
7cdc2bab
MD
808 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
809 print_dbg("ok");
810 break;
50842bdc 811 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
7cdc2bab
MD
812 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
813 break;
50842bdc 814 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
7cdc2bab
MD
815 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
816 break;
50842bdc 817 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
7cdc2bab
MD
818 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
819 break;
50842bdc 820 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
7cdc2bab
MD
821 default: /* fall-through */
822 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
823 break;
824 }
825 return next_return;
826}
827
828static
90157d89
PP
829struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream(
830 struct bt_private_connection_private_notification_iterator *iterator,
7cdc2bab
MD
831 struct lttng_live_no_stream_iterator *no_stream_iter)
832{
50842bdc 833 enum bt_lttng_live_iterator_status status;
90157d89 834 struct bt_notification_iterator_next_method_return next_return;
7cdc2bab
MD
835 struct lttng_live_component *lttng_live;
836
837 lttng_live = no_stream_iter->lttng_live;
838retry:
839 lttng_live_force_new_streams_and_metadata(lttng_live);
4513a12e 840 next_return.notification = NULL;
7cdc2bab 841 status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live);
50842bdc 842 if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) {
7cdc2bab
MD
843 goto end;
844 }
845 if (no_stream_iter->port) {
50842bdc 846 status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab 847 } else {
50842bdc 848 status = BT_LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
849 }
850end:
851 switch (status) {
50842bdc 852 case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE:
7cdc2bab 853 goto retry;
50842bdc 854 case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
7cdc2bab
MD
855 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
856 break;
50842bdc 857 case BT_LTTNG_LIVE_ITERATOR_STATUS_END:
7cdc2bab
MD
858 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
859 break;
50842bdc 860 case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL:
7cdc2bab
MD
861 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
862 break;
50842bdc 863 case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM:
7cdc2bab
MD
864 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
865 break;
50842bdc 866 case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED:
7cdc2bab
MD
867 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED;
868 break;
50842bdc 869 case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR:
7cdc2bab
MD
870 default: /* fall-through */
871 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
872 break;
873 }
874 return next_return;
875}
876
d3eb6e8f 877BT_HIDDEN
90157d89
PP
878struct bt_notification_iterator_next_method_return lttng_live_iterator_next(
879 struct bt_private_connection_private_notification_iterator *iterator)
d3eb6e8f 880{
7cdc2bab 881 struct lttng_live_stream_iterator_generic *s =
90157d89
PP
882 bt_private_connection_private_notification_iterator_get_user_data(iterator);
883 struct bt_notification_iterator_next_method_return next_return;
7cdc2bab
MD
884
885 switch (s->type) {
886 case LIVE_STREAM_TYPE_NO_STREAM:
887 next_return = lttng_live_iterator_next_no_stream(iterator,
888 container_of(s, struct lttng_live_no_stream_iterator, p));
889 break;
890 case LIVE_STREAM_TYPE_STREAM:
891 next_return = lttng_live_iterator_next_stream(iterator,
892 container_of(s, struct lttng_live_stream_iterator, p));
893 break;
894 default:
895 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
896 break;
897 }
898 return next_return;
899}
41a2b7ae 900
7cdc2bab
MD
901BT_HIDDEN
902enum bt_notification_iterator_status lttng_live_iterator_init(
90157d89 903 struct bt_private_connection_private_notification_iterator *it,
7cdc2bab
MD
904 struct bt_private_port *port)
905{
906 enum bt_notification_iterator_status ret =
907 BT_NOTIFICATION_ITERATOR_STATUS_OK;
908 struct lttng_live_stream_iterator_generic *s;
7cdc2bab 909
f6ccaed9 910 BT_ASSERT(it);
7cdc2bab
MD
911
912 s = bt_private_port_get_user_data(port);
f6ccaed9 913 BT_ASSERT(s);
7cdc2bab
MD
914 switch (s->type) {
915 case LIVE_STREAM_TYPE_NO_STREAM:
916 {
917 struct lttng_live_no_stream_iterator *no_stream_iter =
918 container_of(s, struct lttng_live_no_stream_iterator, p);
90157d89 919 ret = bt_private_connection_private_notification_iterator_set_user_data(it, no_stream_iter);
7cdc2bab
MD
920 if (ret) {
921 goto error;
922 }
923 break;
924 }
925 case LIVE_STREAM_TYPE_STREAM:
926 {
927 struct lttng_live_stream_iterator *stream_iter =
928 container_of(s, struct lttng_live_stream_iterator, p);
90157d89 929 ret = bt_private_connection_private_notification_iterator_set_user_data(it, stream_iter);
7cdc2bab
MD
930 if (ret) {
931 goto error;
932 }
933 break;
934 }
935 default:
936 ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
937 goto end;
938 }
939
940end:
41a2b7ae 941 return ret;
7cdc2bab 942error:
90157d89 943 if (bt_private_connection_private_notification_iterator_set_user_data(it, NULL)
7cdc2bab 944 != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
087bc060 945 BT_LOGE("Error setting private data to NULL");
7cdc2bab
MD
946 }
947 goto end;
948}
949
950static
90157d89 951struct bt_component_class_query_method_return lttng_live_query_list_sessions(
c7eee084
PP
952 struct bt_component_class *comp_class,
953 struct bt_query_executor *query_exec,
7cdc2bab
MD
954 struct bt_value *params)
955{
90157d89 956 struct bt_component_class_query_method_return query_ret = {
c7eee084
PP
957 .result = NULL,
958 .status = BT_QUERY_STATUS_OK,
959 };
960
7cdc2bab 961 struct bt_value *url_value = NULL;
7cdc2bab
MD
962 const char *url;
963 struct bt_live_viewer_connection *viewer_connection = NULL;
7cdc2bab
MD
964
965 url_value = bt_value_map_get(params, "url");
966 if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) {
087bc060 967 BT_LOGW("Mandatory \"url\" parameter missing");
c7eee084 968 query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
969 goto error;
970 }
971
3cdf4234 972 if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) {
087bc060 973 BT_LOGW("\"url\" parameter is required to be a string value");
c7eee084 974 query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS;
7cdc2bab
MD
975 goto error;
976 }
977
4c66436f 978 viewer_connection = bt_live_viewer_connection_create(url, NULL);
7cdc2bab 979 if (!viewer_connection) {
7cdc2bab
MD
980 goto error;
981 }
982
c7eee084
PP
983 query_ret.result =
984 bt_live_viewer_connection_list_sessions(viewer_connection);
985 if (!query_ret.result) {
986 goto error;
987 }
988
7cdc2bab 989 goto end;
c7eee084 990
7cdc2bab 991error:
c7eee084
PP
992 BT_PUT(query_ret.result);
993
994 if (query_ret.status >= 0) {
995 query_ret.status = BT_QUERY_STATUS_ERROR;
996 }
997
7cdc2bab
MD
998end:
999 if (viewer_connection) {
1000 bt_live_viewer_connection_destroy(viewer_connection);
1001 }
1002 BT_PUT(url_value);
c7eee084 1003 return query_ret;
7cdc2bab
MD
1004}
1005
1006BT_HIDDEN
90157d89 1007struct bt_component_class_query_method_return lttng_live_query(
c7eee084
PP
1008 struct bt_component_class *comp_class,
1009 struct bt_query_executor *query_exec,
7cdc2bab
MD
1010 const char *object, struct bt_value *params)
1011{
90157d89 1012 struct bt_component_class_query_method_return ret = {
c7eee084
PP
1013 .result = NULL,
1014 .status = BT_QUERY_STATUS_OK,
1015 };
1016
7cdc2bab
MD
1017 if (strcmp(object, "sessions") == 0) {
1018 return lttng_live_query_list_sessions(comp_class,
c7eee084 1019 query_exec, params);
7cdc2bab 1020 }
087bc060 1021 BT_LOGW("Unknown query object `%s`", object);
c7eee084
PP
1022 ret.status = BT_QUERY_STATUS_INVALID_OBJECT;
1023 return ret;
7cdc2bab
MD
1024}
1025
1026static
1027void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live)
1028{
1029 int ret;
1030 struct lttng_live_session *session, *s;
1031
1032 bt_list_for_each_entry_safe(session, s, &lttng_live->sessions, node) {
1033 lttng_live_destroy_session(session);
1034 }
1035 BT_PUT(lttng_live->viewer_connection);
1036 if (lttng_live->url) {
1037 g_string_free(lttng_live->url, TRUE);
1038 }
1039 if (lttng_live->no_stream_port) {
6f79a7cf 1040 bt_get(lttng_live->no_stream_port);
7cdc2bab 1041 ret = bt_private_port_remove_from_component(lttng_live->no_stream_port);
6f79a7cf 1042 bt_put(lttng_live->no_stream_port);
f6ccaed9 1043 BT_ASSERT(!ret);
7cdc2bab
MD
1044 }
1045 if (lttng_live->no_stream_iter) {
1046 g_free(lttng_live->no_stream_iter);
1047 }
1048 g_free(lttng_live);
1049}
1050
1051BT_HIDDEN
1052void lttng_live_component_finalize(struct bt_private_component *component)
1053{
1054 void *data = bt_private_component_get_user_data(component);
1055
1056 if (!data) {
1057 return;
1058 }
1059 lttng_live_component_destroy_data(data);
1060}
1061
1062static
1063struct lttng_live_component *lttng_live_component_create(struct bt_value *params,
6f79a7cf 1064 struct bt_private_component *private_component)
7cdc2bab
MD
1065{
1066 struct lttng_live_component *lttng_live;
1067 struct bt_value *value = NULL;
1068 const char *url;
1069 enum bt_value_status ret;
1070
1071 lttng_live = g_new0(struct lttng_live_component, 1);
1072 if (!lttng_live) {
1073 goto end;
1074 }
7cdc2bab
MD
1075 /* TODO: make this an overridable parameter. */
1076 lttng_live->max_query_size = MAX_QUERY_SIZE;
1077 BT_INIT_LIST_HEAD(&lttng_live->sessions);
1078 value = bt_value_map_get(params, "url");
1079 if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
087bc060 1080 BT_LOGW("Mandatory \"url\" parameter missing");
7cdc2bab
MD
1081 goto error;
1082 }
1083 ret = bt_value_string_get(value, &url);
1084 if (ret != BT_VALUE_STATUS_OK) {
087bc060 1085 BT_LOGW("\"url\" parameter is required to be a string value");
7cdc2bab
MD
1086 goto error;
1087 }
1088 lttng_live->url = g_string_new(url);
1089 if (!lttng_live->url) {
1090 goto error;
1091 }
6f79a7cf 1092 BT_PUT(value);
7cdc2bab 1093 lttng_live->viewer_connection =
4c66436f 1094 bt_live_viewer_connection_create(lttng_live->url->str, lttng_live);
7cdc2bab 1095 if (!lttng_live->viewer_connection) {
7cdc2bab
MD
1096 goto error;
1097 }
1098 if (lttng_live_create_viewer_session(lttng_live)) {
7cdc2bab
MD
1099 goto error;
1100 }
1101 lttng_live->private_component = private_component;
4c66436f 1102
7cdc2bab
MD
1103 goto end;
1104
1105error:
1106 lttng_live_component_destroy_data(lttng_live);
1107 lttng_live = NULL;
1108end:
1109 return lttng_live;
d3e4dcd8
PP
1110}
1111
f3bc2010 1112BT_HIDDEN
3cdf4234
MD
1113enum bt_component_status lttng_live_component_init(
1114 struct bt_private_component *private_component,
7cdc2bab 1115 struct bt_value *params, void *init_method_data)
f3bc2010 1116{
7cdc2bab
MD
1117 struct lttng_live_component *lttng_live;
1118 enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
1119
7cdc2bab 1120 /* Passes ownership of iter ref to lttng_live_component_create. */
6f79a7cf 1121 lttng_live = lttng_live_component_create(params, private_component);
7cdc2bab 1122 if (!lttng_live) {
6f79a7cf
MD
1123 //TODO : we need access to the application cancel state
1124 //because we are not part of a graph yet.
1125 ret = BT_COMPONENT_STATUS_NOMEM;
7cdc2bab
MD
1126 goto end;
1127 }
1128
1129 lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1);
1130 lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM;
1131 lttng_live->no_stream_iter->lttng_live = lttng_live;
4bf0e537
MD
1132 if (lttng_live_is_canceled(lttng_live)) {
1133 goto end;
1134 }
147337a3 1135 ret = bt_private_component_source_add_output_private_port(
7cdc2bab 1136 lttng_live->private_component, "no-stream",
147337a3
PP
1137 lttng_live->no_stream_iter,
1138 &lttng_live->no_stream_port);
1139 if (ret != BT_COMPONENT_STATUS_OK) {
1140 goto end;
1141 }
6f79a7cf 1142 bt_put(lttng_live->no_stream_port); /* weak */
7cdc2bab
MD
1143 lttng_live->no_stream_iter->port = lttng_live->no_stream_port;
1144
3cdf4234 1145 ret = bt_private_component_set_user_data(private_component, lttng_live);
7cdc2bab
MD
1146 if (ret != BT_COMPONENT_STATUS_OK) {
1147 goto error;
1148 }
1149
1150end:
1151 return ret;
1152error:
3cdf4234 1153 (void) bt_private_component_set_user_data(private_component, NULL);
7cdc2bab
MD
1154 lttng_live_component_destroy_data(lttng_live);
1155 return ret;
f3bc2010 1156}
087bc060 1157
d85ef162
MD
1158BT_HIDDEN
1159enum bt_component_status lttng_live_accept_port_connection(
1160 struct bt_private_component *private_component,
1161 struct bt_private_port *self_private_port,
1162 struct bt_port *other_port)
1163{
1164 struct lttng_live_component *lttng_live =
1165 bt_private_component_get_user_data(private_component);
1166 struct bt_component *other_component;
1167 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
6d137876 1168 struct bt_port *self_port = bt_port_from_private(self_private_port);
d85ef162
MD
1169
1170 other_component = bt_port_get_component(other_port);
1171 bt_put(other_component); /* weak */
1172
1173 if (!lttng_live->downstream_component) {
1174 lttng_live->downstream_component = other_component;
1175 goto end;
1176 }
1177
1178 /*
1179 * Compare prior component to ensure we are connected to the
1180 * same downstream component as prior ports.
1181 */
1182 if (lttng_live->downstream_component != other_component) {
1183 BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".",
1184 bt_port_get_name(self_port),
1185 bt_component_get_name(other_component),
1186 bt_component_get_name(lttng_live->downstream_component));
1187 status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION;
1188 goto end;
1189 }
1190end:
1191 bt_put(self_port);
1192 return status;
1193}
This page took 0.094489 seconds and 4 git commands to generate.