Commit | Line | Data |
---|---|---|
f3bc2010 JG |
1 | /* |
2 | * lttng-live.c | |
3 | * | |
4 | * Babeltrace CTF LTTng-live Client Component | |
5 | * | |
6 | * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
7cdc2bab | 7 | * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> |
f3bc2010 JG |
8 | * |
9 | * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
10 | * | |
11 | * Permission is hereby granted, free of charge, to any person obtaining a copy | |
12 | * of this software and associated documentation files (the "Software"), to deal | |
13 | * in the Software without restriction, including without limitation the rights | |
14 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
15 | * copies of the Software, and to permit persons to whom the Software is | |
16 | * furnished to do so, subject to the following conditions: | |
17 | * | |
18 | * The above copyright notice and this permission notice shall be included in | |
19 | * all copies or substantial portions of the Software. | |
20 | * | |
21 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
22 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
23 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
24 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
25 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
26 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
27 | * SOFTWARE. | |
28 | */ | |
29 | ||
020bc26f PP |
30 | #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC" |
31 | #include "logging.h" | |
32 | ||
9d408fca | 33 | #include <babeltrace/babeltrace.h> |
7cdc2bab | 34 | #include <babeltrace/compiler-internal.h> |
6f79a7cf | 35 | #include <babeltrace/types.h> |
7cdc2bab MD |
36 | #include <inttypes.h> |
37 | #include <glib.h> | |
f6ccaed9 | 38 | #include <babeltrace/assert-internal.h> |
7cdc2bab | 39 | #include <unistd.h> |
7d61fa8e | 40 | #include <plugins-common.h> |
f3bc2010 | 41 | |
7cdc2bab MD |
42 | #include "data-stream.h" |
43 | #include "metadata.h" | |
0f5e83e5 | 44 | #include "lttng-live-internal.h" |
7cdc2bab | 45 | |
7cdc2bab | 46 | #define MAX_QUERY_SIZE (256*1024) |
7cdc2bab | 47 | |
087bc060 | 48 | #define print_dbg(fmt, ...) BT_LOGD(fmt, ## __VA_ARGS__) |
7cdc2bab MD |
49 | |
50 | static const char *print_state(struct lttng_live_stream_iterator *s) | |
51 | { | |
52 | switch (s->state) { | |
53 | case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: | |
54 | return "ACTIVE_NO_DATA"; | |
55 | case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: | |
56 | return "QUIESCENT_NO_DATA"; | |
57 | case LTTNG_LIVE_STREAM_QUIESCENT: | |
58 | return "QUIESCENT"; | |
59 | case LTTNG_LIVE_STREAM_ACTIVE_DATA: | |
60 | return "ACTIVE_DATA"; | |
61 | case LTTNG_LIVE_STREAM_EOF: | |
62 | return "EOF"; | |
63 | default: | |
64 | return "ERROR"; | |
65 | } | |
66 | } | |
7cdc2bab | 67 | |
6f79a7cf MD |
68 | static |
69 | void print_stream_state(struct lttng_live_stream_iterator *stream) | |
70 | { | |
71 | struct bt_port *port; | |
72 | ||
6d137876 | 73 | port = bt_port_from_private(stream->port); |
6f79a7cf MD |
74 | print_dbg("stream %s state %s last_inact_ts %" PRId64 " cur_inact_ts %" PRId64, |
75 | bt_port_get_name(port), | |
76 | print_state(stream), | |
77 | stream->last_returned_inactivity_timestamp, | |
78 | stream->current_inactivity_timestamp); | |
79 | bt_put(port); | |
80 | } | |
81 | ||
82 | BT_HIDDEN | |
83 | bt_bool lttng_live_is_canceled(struct lttng_live_component *lttng_live) | |
84 | { | |
85 | struct bt_component *component; | |
86 | struct bt_graph *graph; | |
87 | bt_bool ret; | |
88 | ||
89 | if (!lttng_live) { | |
90 | return BT_FALSE; | |
91 | } | |
92 | ||
6d137876 | 93 | component = bt_component_from_private(lttng_live->private_component); |
6f79a7cf MD |
94 | graph = bt_component_get_graph(component); |
95 | ret = bt_graph_is_canceled(graph); | |
96 | bt_put(graph); | |
97 | bt_put(component); | |
98 | return ret; | |
99 | } | |
7cdc2bab | 100 | |
7cdc2bab MD |
101 | BT_HIDDEN |
102 | int lttng_live_add_port(struct lttng_live_component *lttng_live, | |
103 | struct lttng_live_stream_iterator *stream_iter) | |
104 | { | |
105 | int ret; | |
106 | struct bt_private_port *private_port; | |
107 | char name[STREAM_NAME_MAX_LEN]; | |
6f79a7cf | 108 | enum bt_component_status status; |
7cdc2bab MD |
109 | |
110 | ret = sprintf(name, STREAM_NAME_PREFIX "%" PRIu64, stream_iter->viewer_stream_id); | |
f6ccaed9 | 111 | BT_ASSERT(ret > 0); |
7cdc2bab | 112 | strcpy(stream_iter->name, name); |
4bf0e537 MD |
113 | if (lttng_live_is_canceled(lttng_live)) { |
114 | return 0; | |
115 | } | |
6f79a7cf | 116 | status = bt_private_component_source_add_output_private_port( |
147337a3 PP |
117 | lttng_live->private_component, name, stream_iter, |
118 | &private_port); | |
6f79a7cf MD |
119 | switch (status) { |
120 | case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: | |
121 | return 0; | |
122 | case BT_COMPONENT_STATUS_OK: | |
123 | break; | |
124 | default: | |
7cdc2bab MD |
125 | return -1; |
126 | } | |
6f79a7cf | 127 | bt_put(private_port); /* weak */ |
087bc060 | 128 | BT_LOGI("Added port %s", name); |
7cdc2bab MD |
129 | |
130 | if (lttng_live->no_stream_port) { | |
6f79a7cf | 131 | bt_get(lttng_live->no_stream_port); |
7cdc2bab | 132 | ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); |
6f79a7cf | 133 | bt_put(lttng_live->no_stream_port); |
7cdc2bab MD |
134 | if (ret) { |
135 | return -1; | |
136 | } | |
6f79a7cf | 137 | lttng_live->no_stream_port = NULL; |
7cdc2bab MD |
138 | lttng_live->no_stream_iter->port = NULL; |
139 | } | |
140 | stream_iter->port = private_port; | |
141 | return 0; | |
142 | } | |
143 | ||
144 | BT_HIDDEN | |
145 | int lttng_live_remove_port(struct lttng_live_component *lttng_live, | |
146 | struct bt_private_port *port) | |
147 | { | |
148 | struct bt_component *component; | |
149 | int64_t nr_ports; | |
150 | int ret; | |
151 | ||
6d137876 | 152 | component = bt_component_from_private(lttng_live->private_component); |
7cdc2bab MD |
153 | nr_ports = bt_component_source_get_output_port_count(component); |
154 | if (nr_ports < 0) { | |
155 | return -1; | |
156 | } | |
157 | BT_PUT(component); | |
158 | if (nr_ports == 1) { | |
6f79a7cf MD |
159 | enum bt_component_status status; |
160 | ||
f6ccaed9 | 161 | BT_ASSERT(!lttng_live->no_stream_port); |
4bf0e537 MD |
162 | |
163 | if (lttng_live_is_canceled(lttng_live)) { | |
164 | return 0; | |
165 | } | |
6f79a7cf | 166 | status = bt_private_component_source_add_output_private_port(lttng_live->private_component, |
147337a3 PP |
167 | "no-stream", lttng_live->no_stream_iter, |
168 | <tng_live->no_stream_port); | |
6f79a7cf MD |
169 | switch (status) { |
170 | case BT_COMPONENT_STATUS_GRAPH_IS_CANCELED: | |
171 | return 0; | |
172 | case BT_COMPONENT_STATUS_OK: | |
173 | break; | |
174 | default: | |
7cdc2bab MD |
175 | return -1; |
176 | } | |
6f79a7cf | 177 | bt_put(lttng_live->no_stream_port); /* weak */ |
7cdc2bab MD |
178 | lttng_live->no_stream_iter->port = lttng_live->no_stream_port; |
179 | } | |
6f79a7cf | 180 | bt_get(port); |
7cdc2bab | 181 | ret = bt_private_port_remove_from_component(port); |
6f79a7cf | 182 | bt_put(port); |
7cdc2bab MD |
183 | if (ret) { |
184 | return -1; | |
185 | } | |
186 | return 0; | |
187 | } | |
188 | ||
189 | static | |
190 | struct lttng_live_trace *lttng_live_find_trace(struct lttng_live_session *session, | |
191 | uint64_t trace_id) | |
d3e4dcd8 | 192 | { |
7cdc2bab MD |
193 | struct lttng_live_trace *trace; |
194 | ||
195 | bt_list_for_each_entry(trace, &session->traces, node) { | |
196 | if (trace->id == trace_id) { | |
197 | return trace; | |
198 | } | |
199 | } | |
d3eb6e8f PP |
200 | return NULL; |
201 | } | |
202 | ||
7cdc2bab MD |
203 | static |
204 | void lttng_live_destroy_trace(struct bt_object *obj) | |
205 | { | |
206 | struct lttng_live_trace *trace = container_of(obj, struct lttng_live_trace, obj); | |
207 | ||
087bc060 | 208 | BT_LOGI("Destroy trace"); |
f6ccaed9 | 209 | BT_ASSERT(bt_list_empty(&trace->streams)); |
7cdc2bab | 210 | bt_list_del(&trace->node); |
5bd230f4 | 211 | |
4bf0e537 MD |
212 | if (trace->trace) { |
213 | int retval; | |
5bd230f4 | 214 | |
50842bdc | 215 | retval = bt_trace_set_is_static(trace->trace); |
f6ccaed9 | 216 | BT_ASSERT(!retval); |
4bf0e537 MD |
217 | BT_PUT(trace->trace); |
218 | } | |
7cdc2bab MD |
219 | lttng_live_metadata_fini(trace); |
220 | BT_PUT(trace->cc_prio_map); | |
221 | g_free(trace); | |
222 | } | |
223 | ||
224 | static | |
225 | struct lttng_live_trace *lttng_live_create_trace(struct lttng_live_session *session, | |
226 | uint64_t trace_id) | |
227 | { | |
228 | struct lttng_live_trace *trace = NULL; | |
229 | ||
230 | trace = g_new0(struct lttng_live_trace, 1); | |
231 | if (!trace) { | |
232 | goto error; | |
233 | } | |
234 | trace->session = session; | |
235 | trace->id = trace_id; | |
236 | BT_INIT_LIST_HEAD(&trace->streams); | |
237 | trace->new_metadata_needed = true; | |
238 | bt_list_add(&trace->node, &session->traces); | |
239 | bt_object_init(&trace->obj, lttng_live_destroy_trace); | |
087bc060 | 240 | BT_LOGI("Create trace"); |
7cdc2bab MD |
241 | goto end; |
242 | error: | |
243 | g_free(trace); | |
244 | trace = NULL; | |
245 | end: | |
246 | return trace; | |
247 | } | |
248 | ||
249 | BT_HIDDEN | |
250 | struct lttng_live_trace *lttng_live_ref_trace(struct lttng_live_session *session, | |
251 | uint64_t trace_id) | |
252 | { | |
253 | struct lttng_live_trace *trace; | |
254 | ||
255 | trace = lttng_live_find_trace(session, trace_id); | |
256 | if (trace) { | |
257 | bt_get(trace); | |
258 | return trace; | |
259 | } | |
260 | return lttng_live_create_trace(session, trace_id); | |
261 | } | |
262 | ||
263 | BT_HIDDEN | |
264 | void lttng_live_unref_trace(struct lttng_live_trace *trace) | |
265 | { | |
266 | bt_put(trace); | |
267 | } | |
268 | ||
269 | static | |
270 | void lttng_live_close_trace_streams(struct lttng_live_trace *trace) | |
271 | { | |
272 | struct lttng_live_stream_iterator *stream, *s; | |
273 | ||
274 | bt_list_for_each_entry_safe(stream, s, &trace->streams, node) { | |
275 | lttng_live_stream_iterator_destroy(stream); | |
276 | } | |
277 | lttng_live_metadata_fini(trace); | |
278 | } | |
279 | ||
280 | BT_HIDDEN | |
06994c71 MD |
281 | int lttng_live_add_session(struct lttng_live_component *lttng_live, |
282 | uint64_t session_id, const char *hostname, | |
283 | const char *session_name) | |
7cdc2bab MD |
284 | { |
285 | int ret = 0; | |
286 | struct lttng_live_session *s; | |
287 | ||
288 | s = g_new0(struct lttng_live_session, 1); | |
289 | if (!s) { | |
290 | goto error; | |
291 | } | |
292 | ||
293 | s->id = session_id; | |
294 | BT_INIT_LIST_HEAD(&s->traces); | |
295 | s->lttng_live = lttng_live; | |
296 | s->new_streams_needed = true; | |
06994c71 MD |
297 | s->hostname = g_string_new(hostname); |
298 | s->session_name = g_string_new(session_name); | |
7cdc2bab | 299 | |
06994c71 MD |
300 | BT_LOGI("Reading from session: %" PRIu64 " hostname: %s session_name: %s", |
301 | s->id, hostname, session_name); | |
7cdc2bab MD |
302 | bt_list_add(&s->node, <tng_live->sessions); |
303 | goto end; | |
304 | error: | |
087bc060 | 305 | BT_LOGE("Error adding session"); |
7cdc2bab MD |
306 | g_free(s); |
307 | ret = -1; | |
308 | end: | |
309 | return ret; | |
310 | } | |
311 | ||
312 | static | |
313 | void lttng_live_destroy_session(struct lttng_live_session *session) | |
314 | { | |
315 | struct lttng_live_trace *trace, *t; | |
316 | ||
087bc060 | 317 | BT_LOGI("Destroy session"); |
7cdc2bab MD |
318 | if (session->id != -1ULL) { |
319 | if (lttng_live_detach_session(session)) { | |
6f79a7cf | 320 | if (!lttng_live_is_canceled(session->lttng_live)) { |
4c66436f MD |
321 | /* Old relayd cannot detach sessions. */ |
322 | BT_LOGD("Unable to detach session %" PRIu64, | |
323 | session->id); | |
324 | } | |
7cdc2bab MD |
325 | } |
326 | session->id = -1ULL; | |
327 | } | |
328 | bt_list_for_each_entry_safe(trace, t, &session->traces, node) { | |
329 | lttng_live_close_trace_streams(trace); | |
330 | } | |
331 | bt_list_del(&session->node); | |
06994c71 MD |
332 | if (session->hostname) { |
333 | g_string_free(session->hostname, TRUE); | |
334 | } | |
335 | if (session->session_name) { | |
336 | g_string_free(session->session_name, TRUE); | |
337 | } | |
7cdc2bab MD |
338 | g_free(session); |
339 | } | |
340 | ||
341 | BT_HIDDEN | |
90157d89 | 342 | void lttng_live_iterator_finalize(struct bt_private_connection_private_notification_iterator *it) |
7cdc2bab MD |
343 | { |
344 | struct lttng_live_stream_iterator_generic *s = | |
90157d89 | 345 | bt_private_connection_private_notification_iterator_get_user_data(it); |
7cdc2bab MD |
346 | |
347 | switch (s->type) { | |
348 | case LIVE_STREAM_TYPE_NO_STREAM: | |
349 | { | |
350 | /* Leave no_stream_iter in place when port is removed. */ | |
351 | break; | |
352 | } | |
353 | case LIVE_STREAM_TYPE_STREAM: | |
354 | { | |
355 | struct lttng_live_stream_iterator *stream_iter = | |
356 | container_of(s, struct lttng_live_stream_iterator, p); | |
357 | ||
358 | lttng_live_stream_iterator_destroy(stream_iter); | |
359 | break; | |
360 | } | |
361 | } | |
362 | } | |
363 | ||
364 | static | |
50842bdc | 365 | enum bt_lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( |
7cdc2bab MD |
366 | struct lttng_live_component *lttng_live, |
367 | struct lttng_live_stream_iterator *lttng_live_stream) | |
368 | { | |
369 | switch (lttng_live_stream->state) { | |
370 | case LTTNG_LIVE_STREAM_QUIESCENT: | |
371 | case LTTNG_LIVE_STREAM_ACTIVE_DATA: | |
372 | break; | |
373 | case LTTNG_LIVE_STREAM_ACTIVE_NO_DATA: | |
374 | /* Invalid state. */ | |
087bc060 MD |
375 | BT_LOGF("Unexpected stream state \"ACTIVE_NO_DATA\""); |
376 | abort(); | |
7cdc2bab MD |
377 | case LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA: |
378 | /* Invalid state. */ | |
087bc060 MD |
379 | BT_LOGF("Unexpected stream state \"QUIESCENT_NO_DATA\""); |
380 | abort(); | |
7cdc2bab MD |
381 | case LTTNG_LIVE_STREAM_EOF: |
382 | break; | |
383 | } | |
50842bdc | 384 | return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; |
7cdc2bab MD |
385 | } |
386 | ||
387 | /* | |
388 | * For active no data stream, fetch next data. It can be either: | |
389 | * - quiescent: need to put it in the prio heap at quiescent end | |
390 | * timestamp, | |
391 | * - have data: need to wire up first event into the prio heap, | |
392 | * - have no data on this stream at this point: need to retry (AGAIN) or | |
393 | * return EOF. | |
394 | */ | |
395 | static | |
50842bdc | 396 | enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( |
7cdc2bab MD |
397 | struct lttng_live_component *lttng_live, |
398 | struct lttng_live_stream_iterator *lttng_live_stream) | |
399 | { | |
50842bdc PP |
400 | enum bt_lttng_live_iterator_status ret = |
401 | BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
402 | struct packet_index index; |
403 | enum lttng_live_stream_state orig_state = lttng_live_stream->state; | |
404 | ||
405 | if (lttng_live_stream->trace->new_metadata_needed) { | |
50842bdc | 406 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
407 | goto end; |
408 | } | |
409 | if (lttng_live_stream->trace->session->new_streams_needed) { | |
50842bdc | 410 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
411 | goto end; |
412 | } | |
413 | if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA | |
414 | && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) { | |
415 | goto end; | |
416 | } | |
417 | ret = lttng_live_get_next_index(lttng_live, lttng_live_stream, &index); | |
50842bdc | 418 | if (ret != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
7cdc2bab MD |
419 | goto end; |
420 | } | |
f6ccaed9 | 421 | BT_ASSERT(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); |
7cdc2bab MD |
422 | if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { |
423 | if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA | |
424 | && lttng_live_stream->last_returned_inactivity_timestamp == | |
425 | lttng_live_stream->current_inactivity_timestamp) { | |
50842bdc | 426 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; |
7cdc2bab MD |
427 | print_stream_state(lttng_live_stream); |
428 | } else { | |
50842bdc | 429 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
430 | } |
431 | goto end; | |
432 | } | |
433 | lttng_live_stream->base_offset = index.offset; | |
434 | lttng_live_stream->offset = index.offset; | |
435 | lttng_live_stream->len = index.packet_size / CHAR_BIT; | |
436 | end: | |
50842bdc | 437 | if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
7cdc2bab MD |
438 | ret = lttng_live_iterator_next_check_stream_state( |
439 | lttng_live, lttng_live_stream); | |
440 | } | |
441 | return ret; | |
442 | } | |
443 | ||
444 | /* | |
445 | * Creation of the notification requires the ctf trace to be created | |
446 | * beforehand, but the live protocol gives us all streams (including | |
447 | * metadata) at once. So we split it in three steps: getting streams, | |
448 | * getting metadata (which creates the ctf trace), and then creating the | |
449 | * per-stream notifications. | |
450 | */ | |
451 | static | |
50842bdc | 452 | enum bt_lttng_live_iterator_status lttng_live_get_session( |
7cdc2bab MD |
453 | struct lttng_live_component *lttng_live, |
454 | struct lttng_live_session *session) | |
455 | { | |
50842bdc | 456 | enum bt_lttng_live_iterator_status status; |
7cdc2bab MD |
457 | struct lttng_live_trace *trace, *t; |
458 | ||
459 | if (lttng_live_attach_session(session)) { | |
6f79a7cf | 460 | if (lttng_live_is_canceled(lttng_live)) { |
50842bdc | 461 | return BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; |
4c66436f | 462 | } else { |
50842bdc | 463 | return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; |
4c66436f | 464 | } |
7cdc2bab MD |
465 | } |
466 | status = lttng_live_get_new_streams(session); | |
50842bdc PP |
467 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && |
468 | status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { | |
7cdc2bab MD |
469 | return status; |
470 | } | |
471 | bt_list_for_each_entry_safe(trace, t, &session->traces, node) { | |
472 | status = lttng_live_metadata_update(trace); | |
50842bdc PP |
473 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK && |
474 | status != BT_LTTNG_LIVE_ITERATOR_STATUS_END) { | |
7cdc2bab MD |
475 | return status; |
476 | } | |
477 | } | |
478 | return lttng_live_lazy_notif_init(session); | |
479 | } | |
480 | ||
481 | BT_HIDDEN | |
482 | void lttng_live_need_new_streams(struct lttng_live_component *lttng_live) | |
483 | { | |
484 | struct lttng_live_session *session; | |
485 | ||
486 | bt_list_for_each_entry(session, <tng_live->sessions, node) { | |
487 | session->new_streams_needed = true; | |
488 | } | |
489 | } | |
490 | ||
491 | static | |
492 | void lttng_live_force_new_streams_and_metadata(struct lttng_live_component *lttng_live) | |
493 | { | |
494 | struct lttng_live_session *session; | |
495 | ||
496 | bt_list_for_each_entry(session, <tng_live->sessions, node) { | |
497 | struct lttng_live_trace *trace; | |
498 | ||
499 | session->new_streams_needed = true; | |
500 | bt_list_for_each_entry(trace, &session->traces, node) { | |
501 | trace->new_metadata_needed = true; | |
502 | } | |
503 | } | |
504 | } | |
505 | ||
506 | static | |
50842bdc | 507 | enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_new_streams_and_metadata( |
7cdc2bab MD |
508 | struct lttng_live_component *lttng_live) |
509 | { | |
50842bdc PP |
510 | enum bt_lttng_live_iterator_status ret = |
511 | BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
512 | unsigned int nr_sessions_opened = 0; |
513 | struct lttng_live_session *session, *s; | |
514 | ||
515 | bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { | |
516 | if (session->closed && bt_list_empty(&session->traces)) { | |
517 | lttng_live_destroy_session(session); | |
518 | } | |
519 | } | |
520 | /* | |
521 | * Currently, when there are no sessions, we quit immediately. | |
522 | * We may want to add a component parameter to keep trying until | |
523 | * we get data in the future. | |
524 | * Also, in a remotely distant future, we could add a "new | |
525 | * session" flag to the protocol, which would tell us that we | |
526 | * need to query for new sessions even though we have sessions | |
527 | * currently ongoing. | |
528 | */ | |
529 | if (bt_list_empty(<tng_live->sessions)) { | |
50842bdc | 530 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; |
7cdc2bab MD |
531 | goto end; |
532 | } | |
533 | bt_list_for_each_entry(session, <tng_live->sessions, node) { | |
534 | ret = lttng_live_get_session(lttng_live, session); | |
535 | switch (ret) { | |
50842bdc | 536 | case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: |
7cdc2bab | 537 | break; |
50842bdc PP |
538 | case BT_LTTNG_LIVE_ITERATOR_STATUS_END: |
539 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab MD |
540 | break; |
541 | default: | |
542 | goto end; | |
543 | } | |
544 | if (!session->closed) { | |
545 | nr_sessions_opened++; | |
546 | } | |
547 | } | |
548 | end: | |
50842bdc PP |
549 | if (ret == BT_LTTNG_LIVE_ITERATOR_STATUS_OK && !nr_sessions_opened) { |
550 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; | |
7cdc2bab MD |
551 | } |
552 | return ret; | |
553 | } | |
554 | ||
555 | static | |
50842bdc | 556 | enum bt_lttng_live_iterator_status emit_inactivity_notification( |
7cdc2bab MD |
557 | struct lttng_live_component *lttng_live, |
558 | struct lttng_live_stream_iterator *lttng_live_stream, | |
559 | struct bt_notification **notification, | |
560 | uint64_t timestamp) | |
561 | { | |
50842bdc PP |
562 | enum bt_lttng_live_iterator_status ret = |
563 | BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab | 564 | struct lttng_live_trace *trace; |
50842bdc PP |
565 | struct bt_clock_class *clock_class = NULL; |
566 | struct bt_clock_value *clock_value = NULL; | |
7cdc2bab MD |
567 | struct bt_notification *notif = NULL; |
568 | int retval; | |
569 | ||
570 | trace = lttng_live_stream->trace; | |
571 | if (!trace) { | |
572 | goto error; | |
573 | } | |
574 | clock_class = bt_clock_class_priority_map_get_clock_class_by_index(trace->cc_prio_map, 0); | |
575 | if (!clock_class) { | |
576 | goto error; | |
577 | } | |
50842bdc | 578 | clock_value = bt_clock_value_create(clock_class, timestamp); |
7cdc2bab MD |
579 | if (!clock_value) { |
580 | goto error; | |
581 | } | |
582 | notif = bt_notification_inactivity_create(trace->cc_prio_map); | |
583 | if (!notif) { | |
584 | goto error; | |
585 | } | |
586 | retval = bt_notification_inactivity_set_clock_value(notif, clock_value); | |
587 | if (retval) { | |
588 | goto error; | |
589 | } | |
590 | *notification = notif; | |
591 | end: | |
592 | bt_put(clock_value); | |
593 | bt_put(clock_class); | |
594 | return ret; | |
595 | ||
596 | error: | |
50842bdc | 597 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; |
7cdc2bab MD |
598 | bt_put(notif); |
599 | goto end; | |
600 | } | |
601 | ||
602 | static | |
50842bdc | 603 | enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_stream( |
7cdc2bab MD |
604 | struct lttng_live_component *lttng_live, |
605 | struct lttng_live_stream_iterator *lttng_live_stream, | |
606 | struct bt_notification **notification) | |
607 | { | |
50842bdc PP |
608 | enum bt_lttng_live_iterator_status ret = |
609 | BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
610 | struct bt_clock_class *clock_class = NULL; | |
611 | struct bt_clock_value *clock_value = NULL; | |
7cdc2bab MD |
612 | |
613 | if (lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT) { | |
50842bdc | 614 | return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; |
7cdc2bab MD |
615 | } |
616 | ||
617 | if (lttng_live_stream->current_inactivity_timestamp == | |
618 | lttng_live_stream->last_returned_inactivity_timestamp) { | |
619 | lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; | |
50842bdc | 620 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
621 | goto end; |
622 | } | |
623 | ||
624 | ret = emit_inactivity_notification(lttng_live, lttng_live_stream, notification, | |
625 | (uint64_t) lttng_live_stream->current_inactivity_timestamp); | |
626 | ||
627 | lttng_live_stream->last_returned_inactivity_timestamp = | |
628 | lttng_live_stream->current_inactivity_timestamp; | |
629 | end: | |
630 | bt_put(clock_value); | |
631 | bt_put(clock_class); | |
632 | return ret; | |
633 | } | |
634 | ||
635 | static | |
50842bdc | 636 | enum bt_lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_stream( |
7cdc2bab MD |
637 | struct lttng_live_component *lttng_live, |
638 | struct lttng_live_stream_iterator *lttng_live_stream, | |
639 | struct bt_notification **notification) | |
640 | { | |
50842bdc PP |
641 | enum bt_lttng_live_iterator_status ret = |
642 | BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
643 | enum bt_notif_iter_status status; | |
7cdc2bab MD |
644 | struct lttng_live_session *session; |
645 | ||
646 | bt_list_for_each_entry(session, <tng_live->sessions, node) { | |
647 | struct lttng_live_trace *trace; | |
648 | ||
649 | if (session->new_streams_needed) { | |
50842bdc | 650 | return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
651 | } |
652 | bt_list_for_each_entry(trace, &session->traces, node) { | |
653 | if (trace->new_metadata_needed) { | |
50842bdc | 654 | return BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab MD |
655 | } |
656 | } | |
657 | } | |
658 | ||
659 | if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_DATA) { | |
50842bdc | 660 | return BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; |
7cdc2bab MD |
661 | } |
662 | if (lttng_live_stream->packet_end_notif_queue) { | |
663 | *notification = lttng_live_stream->packet_end_notif_queue; | |
664 | lttng_live_stream->packet_end_notif_queue = NULL; | |
50842bdc | 665 | status = BT_NOTIF_ITER_STATUS_OK; |
7cdc2bab | 666 | } else { |
50842bdc | 667 | status = bt_notif_iter_get_next_notification( |
7cdc2bab MD |
668 | lttng_live_stream->notif_iter, |
669 | lttng_live_stream->trace->cc_prio_map, | |
670 | notification); | |
50842bdc | 671 | if (status == BT_NOTIF_ITER_STATUS_OK) { |
7cdc2bab MD |
672 | /* |
673 | * Consider empty packets as inactivity. | |
674 | */ | |
675 | if (bt_notification_get_type(*notification) == BT_NOTIFICATION_TYPE_PACKET_END) { | |
676 | lttng_live_stream->packet_end_notif_queue = *notification; | |
677 | *notification = NULL; | |
678 | return emit_inactivity_notification(lttng_live, | |
679 | lttng_live_stream, notification, | |
680 | lttng_live_stream->current_packet_end_timestamp); | |
681 | } | |
682 | } | |
683 | } | |
684 | switch (status) { | |
50842bdc PP |
685 | case BT_NOTIF_ITER_STATUS_EOF: |
686 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_END; | |
7cdc2bab | 687 | break; |
50842bdc PP |
688 | case BT_NOTIF_ITER_STATUS_OK: |
689 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_OK; | |
7cdc2bab | 690 | break; |
50842bdc | 691 | case BT_NOTIF_ITER_STATUS_AGAIN: |
7cdc2bab MD |
692 | /* |
693 | * Continue immediately (end of packet). The next | |
694 | * get_index may return AGAIN to delay the following | |
695 | * attempt. | |
696 | */ | |
50842bdc | 697 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; |
7cdc2bab | 698 | break; |
50842bdc | 699 | case BT_NOTIF_ITER_STATUS_INVAL: |
7cdc2bab | 700 | /* No argument provided by the user, so don't return INVAL. */ |
50842bdc | 701 | case BT_NOTIF_ITER_STATUS_ERROR: |
7cdc2bab | 702 | default: |
50842bdc | 703 | ret = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; |
7cdc2bab MD |
704 | break; |
705 | } | |
706 | return ret; | |
707 | } | |
708 | ||
709 | /* | |
710 | * helper function: | |
711 | * handle_no_data_streams() | |
712 | * retry: | |
713 | * - for each ACTIVE_NO_DATA stream: | |
714 | * - query relayd for stream data, or quiescence info. | |
715 | * - if need metadata, get metadata, goto retry. | |
716 | * - if new stream, get new stream as ACTIVE_NO_DATA, goto retry | |
717 | * - if quiescent, move to QUIESCENT streams | |
718 | * - if fetched data, move to ACTIVE_DATA streams | |
719 | * (at this point each stream either has data, or is quiescent) | |
720 | * | |
721 | * | |
722 | * iterator_next: | |
723 | * handle_new_streams_and_metadata() | |
724 | * - query relayd for known streams, add them as ACTIVE_NO_DATA | |
725 | * - query relayd for metadata | |
726 | * | |
727 | * call handle_active_no_data_streams() | |
728 | * | |
729 | * handle_quiescent_streams() | |
730 | * - if at least one stream is ACTIVE_DATA: | |
731 | * - peek stream event with lowest timestamp -> next_ts | |
732 | * - for each quiescent stream | |
733 | * - if next_ts >= quiescent end | |
734 | * - set state to ACTIVE_NO_DATA | |
735 | * - else | |
736 | * - for each quiescent stream | |
737 | * - set state to ACTIVE_NO_DATA | |
738 | * | |
739 | * call handle_active_no_data_streams() | |
740 | * | |
741 | * handle_active_data_streams() | |
742 | * - if at least one stream is ACTIVE_DATA: | |
743 | * - get stream event with lowest timestamp from heap | |
744 | * - make that stream event the current notification. | |
745 | * - move this stream heap position to its next event | |
746 | * - if we need to fetch data from relayd, move | |
747 | * stream to ACTIVE_NO_DATA. | |
748 | * - return OK | |
749 | * - return AGAIN | |
750 | * | |
751 | * end criterion: ctrl-c on client. If relayd exits or the session | |
752 | * closes on the relay daemon side, we keep on waiting for streams. | |
753 | * Eventually handle --end timestamp (also an end criterion). | |
754 | * | |
755 | * When disconnected from relayd: try to re-connect endlessly. | |
756 | */ | |
757 | static | |
90157d89 PP |
758 | struct bt_notification_iterator_next_method_return lttng_live_iterator_next_stream( |
759 | struct bt_private_connection_private_notification_iterator *iterator, | |
7cdc2bab MD |
760 | struct lttng_live_stream_iterator *stream_iter) |
761 | { | |
50842bdc | 762 | enum bt_lttng_live_iterator_status status; |
90157d89 | 763 | struct bt_notification_iterator_next_method_return next_return; |
7cdc2bab MD |
764 | struct lttng_live_component *lttng_live; |
765 | ||
766 | lttng_live = stream_iter->trace->session->lttng_live; | |
767 | retry: | |
768 | print_stream_state(stream_iter); | |
769 | next_return.notification = NULL; | |
770 | status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); | |
50842bdc | 771 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
7cdc2bab MD |
772 | goto end; |
773 | } | |
774 | status = lttng_live_iterator_next_handle_one_no_data_stream( | |
775 | lttng_live, stream_iter); | |
50842bdc | 776 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
7cdc2bab MD |
777 | goto end; |
778 | } | |
779 | status = lttng_live_iterator_next_handle_one_quiescent_stream( | |
780 | lttng_live, stream_iter, &next_return.notification); | |
50842bdc | 781 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
f6ccaed9 | 782 | BT_ASSERT(next_return.notification == NULL); |
7cdc2bab MD |
783 | goto end; |
784 | } | |
785 | if (next_return.notification) { | |
786 | goto end; | |
787 | } | |
788 | status = lttng_live_iterator_next_handle_one_active_data_stream(lttng_live, | |
789 | stream_iter, &next_return.notification); | |
50842bdc | 790 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
f6ccaed9 | 791 | BT_ASSERT(next_return.notification == NULL); |
7cdc2bab MD |
792 | } |
793 | ||
794 | end: | |
795 | switch (status) { | |
50842bdc | 796 | case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: |
7cdc2bab MD |
797 | print_dbg("continue"); |
798 | goto retry; | |
50842bdc | 799 | case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: |
7cdc2bab MD |
800 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; |
801 | print_dbg("again"); | |
802 | break; | |
50842bdc | 803 | case BT_LTTNG_LIVE_ITERATOR_STATUS_END: |
7cdc2bab MD |
804 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; |
805 | print_dbg("end"); | |
806 | break; | |
50842bdc | 807 | case BT_LTTNG_LIVE_ITERATOR_STATUS_OK: |
7cdc2bab MD |
808 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; |
809 | print_dbg("ok"); | |
810 | break; | |
50842bdc | 811 | case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: |
7cdc2bab MD |
812 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; |
813 | break; | |
50842bdc | 814 | case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: |
7cdc2bab MD |
815 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; |
816 | break; | |
50842bdc | 817 | case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: |
7cdc2bab MD |
818 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; |
819 | break; | |
50842bdc | 820 | case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: |
7cdc2bab MD |
821 | default: /* fall-through */ |
822 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; | |
823 | break; | |
824 | } | |
825 | return next_return; | |
826 | } | |
827 | ||
828 | static | |
90157d89 PP |
829 | struct bt_notification_iterator_next_method_return lttng_live_iterator_next_no_stream( |
830 | struct bt_private_connection_private_notification_iterator *iterator, | |
7cdc2bab MD |
831 | struct lttng_live_no_stream_iterator *no_stream_iter) |
832 | { | |
50842bdc | 833 | enum bt_lttng_live_iterator_status status; |
90157d89 | 834 | struct bt_notification_iterator_next_method_return next_return; |
7cdc2bab MD |
835 | struct lttng_live_component *lttng_live; |
836 | ||
837 | lttng_live = no_stream_iter->lttng_live; | |
838 | retry: | |
839 | lttng_live_force_new_streams_and_metadata(lttng_live); | |
4513a12e | 840 | next_return.notification = NULL; |
7cdc2bab | 841 | status = lttng_live_iterator_next_handle_new_streams_and_metadata(lttng_live); |
50842bdc | 842 | if (status != BT_LTTNG_LIVE_ITERATOR_STATUS_OK) { |
7cdc2bab MD |
843 | goto end; |
844 | } | |
845 | if (no_stream_iter->port) { | |
50842bdc | 846 | status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; |
7cdc2bab | 847 | } else { |
50842bdc | 848 | status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; |
7cdc2bab MD |
849 | } |
850 | end: | |
851 | switch (status) { | |
50842bdc | 852 | case BT_LTTNG_LIVE_ITERATOR_STATUS_CONTINUE: |
7cdc2bab | 853 | goto retry; |
50842bdc | 854 | case BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN: |
7cdc2bab MD |
855 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; |
856 | break; | |
50842bdc | 857 | case BT_LTTNG_LIVE_ITERATOR_STATUS_END: |
7cdc2bab MD |
858 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_END; |
859 | break; | |
50842bdc | 860 | case BT_LTTNG_LIVE_ITERATOR_STATUS_INVAL: |
7cdc2bab MD |
861 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; |
862 | break; | |
50842bdc | 863 | case BT_LTTNG_LIVE_ITERATOR_STATUS_NOMEM: |
7cdc2bab MD |
864 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; |
865 | break; | |
50842bdc | 866 | case BT_LTTNG_LIVE_ITERATOR_STATUS_UNSUPPORTED: |
7cdc2bab MD |
867 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED; |
868 | break; | |
50842bdc | 869 | case BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR: |
7cdc2bab MD |
870 | default: /* fall-through */ |
871 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; | |
872 | break; | |
873 | } | |
874 | return next_return; | |
875 | } | |
876 | ||
d3eb6e8f | 877 | BT_HIDDEN |
90157d89 PP |
878 | struct bt_notification_iterator_next_method_return lttng_live_iterator_next( |
879 | struct bt_private_connection_private_notification_iterator *iterator) | |
d3eb6e8f | 880 | { |
7cdc2bab | 881 | struct lttng_live_stream_iterator_generic *s = |
90157d89 PP |
882 | bt_private_connection_private_notification_iterator_get_user_data(iterator); |
883 | struct bt_notification_iterator_next_method_return next_return; | |
7cdc2bab MD |
884 | |
885 | switch (s->type) { | |
886 | case LIVE_STREAM_TYPE_NO_STREAM: | |
887 | next_return = lttng_live_iterator_next_no_stream(iterator, | |
888 | container_of(s, struct lttng_live_no_stream_iterator, p)); | |
889 | break; | |
890 | case LIVE_STREAM_TYPE_STREAM: | |
891 | next_return = lttng_live_iterator_next_stream(iterator, | |
892 | container_of(s, struct lttng_live_stream_iterator, p)); | |
893 | break; | |
894 | default: | |
895 | next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; | |
896 | break; | |
897 | } | |
898 | return next_return; | |
899 | } | |
41a2b7ae | 900 | |
7cdc2bab MD |
901 | BT_HIDDEN |
902 | enum bt_notification_iterator_status lttng_live_iterator_init( | |
90157d89 | 903 | struct bt_private_connection_private_notification_iterator *it, |
7cdc2bab MD |
904 | struct bt_private_port *port) |
905 | { | |
906 | enum bt_notification_iterator_status ret = | |
907 | BT_NOTIFICATION_ITERATOR_STATUS_OK; | |
908 | struct lttng_live_stream_iterator_generic *s; | |
7cdc2bab | 909 | |
f6ccaed9 | 910 | BT_ASSERT(it); |
7cdc2bab MD |
911 | |
912 | s = bt_private_port_get_user_data(port); | |
f6ccaed9 | 913 | BT_ASSERT(s); |
7cdc2bab MD |
914 | switch (s->type) { |
915 | case LIVE_STREAM_TYPE_NO_STREAM: | |
916 | { | |
917 | struct lttng_live_no_stream_iterator *no_stream_iter = | |
918 | container_of(s, struct lttng_live_no_stream_iterator, p); | |
90157d89 | 919 | ret = bt_private_connection_private_notification_iterator_set_user_data(it, no_stream_iter); |
7cdc2bab MD |
920 | if (ret) { |
921 | goto error; | |
922 | } | |
923 | break; | |
924 | } | |
925 | case LIVE_STREAM_TYPE_STREAM: | |
926 | { | |
927 | struct lttng_live_stream_iterator *stream_iter = | |
928 | container_of(s, struct lttng_live_stream_iterator, p); | |
90157d89 | 929 | ret = bt_private_connection_private_notification_iterator_set_user_data(it, stream_iter); |
7cdc2bab MD |
930 | if (ret) { |
931 | goto error; | |
932 | } | |
933 | break; | |
934 | } | |
935 | default: | |
936 | ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; | |
937 | goto end; | |
938 | } | |
939 | ||
940 | end: | |
41a2b7ae | 941 | return ret; |
7cdc2bab | 942 | error: |
90157d89 | 943 | if (bt_private_connection_private_notification_iterator_set_user_data(it, NULL) |
7cdc2bab | 944 | != BT_NOTIFICATION_ITERATOR_STATUS_OK) { |
087bc060 | 945 | BT_LOGE("Error setting private data to NULL"); |
7cdc2bab MD |
946 | } |
947 | goto end; | |
948 | } | |
949 | ||
950 | static | |
90157d89 | 951 | struct bt_component_class_query_method_return lttng_live_query_list_sessions( |
c7eee084 PP |
952 | struct bt_component_class *comp_class, |
953 | struct bt_query_executor *query_exec, | |
7cdc2bab MD |
954 | struct bt_value *params) |
955 | { | |
90157d89 | 956 | struct bt_component_class_query_method_return query_ret = { |
c7eee084 PP |
957 | .result = NULL, |
958 | .status = BT_QUERY_STATUS_OK, | |
959 | }; | |
960 | ||
7cdc2bab | 961 | struct bt_value *url_value = NULL; |
7cdc2bab MD |
962 | const char *url; |
963 | struct bt_live_viewer_connection *viewer_connection = NULL; | |
7cdc2bab MD |
964 | |
965 | url_value = bt_value_map_get(params, "url"); | |
966 | if (!url_value || bt_value_is_null(url_value) || !bt_value_is_string(url_value)) { | |
087bc060 | 967 | BT_LOGW("Mandatory \"url\" parameter missing"); |
c7eee084 | 968 | query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; |
7cdc2bab MD |
969 | goto error; |
970 | } | |
971 | ||
3cdf4234 | 972 | if (bt_value_string_get(url_value, &url) != BT_VALUE_STATUS_OK) { |
087bc060 | 973 | BT_LOGW("\"url\" parameter is required to be a string value"); |
c7eee084 | 974 | query_ret.status = BT_QUERY_STATUS_INVALID_PARAMS; |
7cdc2bab MD |
975 | goto error; |
976 | } | |
977 | ||
4c66436f | 978 | viewer_connection = bt_live_viewer_connection_create(url, NULL); |
7cdc2bab | 979 | if (!viewer_connection) { |
7cdc2bab MD |
980 | goto error; |
981 | } | |
982 | ||
c7eee084 PP |
983 | query_ret.result = |
984 | bt_live_viewer_connection_list_sessions(viewer_connection); | |
985 | if (!query_ret.result) { | |
986 | goto error; | |
987 | } | |
988 | ||
7cdc2bab | 989 | goto end; |
c7eee084 | 990 | |
7cdc2bab | 991 | error: |
c7eee084 PP |
992 | BT_PUT(query_ret.result); |
993 | ||
994 | if (query_ret.status >= 0) { | |
995 | query_ret.status = BT_QUERY_STATUS_ERROR; | |
996 | } | |
997 | ||
7cdc2bab MD |
998 | end: |
999 | if (viewer_connection) { | |
1000 | bt_live_viewer_connection_destroy(viewer_connection); | |
1001 | } | |
1002 | BT_PUT(url_value); | |
c7eee084 | 1003 | return query_ret; |
7cdc2bab MD |
1004 | } |
1005 | ||
1006 | BT_HIDDEN | |
90157d89 | 1007 | struct bt_component_class_query_method_return lttng_live_query( |
c7eee084 PP |
1008 | struct bt_component_class *comp_class, |
1009 | struct bt_query_executor *query_exec, | |
7cdc2bab MD |
1010 | const char *object, struct bt_value *params) |
1011 | { | |
90157d89 | 1012 | struct bt_component_class_query_method_return ret = { |
c7eee084 PP |
1013 | .result = NULL, |
1014 | .status = BT_QUERY_STATUS_OK, | |
1015 | }; | |
1016 | ||
7cdc2bab MD |
1017 | if (strcmp(object, "sessions") == 0) { |
1018 | return lttng_live_query_list_sessions(comp_class, | |
c7eee084 | 1019 | query_exec, params); |
7cdc2bab | 1020 | } |
087bc060 | 1021 | BT_LOGW("Unknown query object `%s`", object); |
c7eee084 PP |
1022 | ret.status = BT_QUERY_STATUS_INVALID_OBJECT; |
1023 | return ret; | |
7cdc2bab MD |
1024 | } |
1025 | ||
1026 | static | |
1027 | void lttng_live_component_destroy_data(struct lttng_live_component *lttng_live) | |
1028 | { | |
1029 | int ret; | |
1030 | struct lttng_live_session *session, *s; | |
1031 | ||
1032 | bt_list_for_each_entry_safe(session, s, <tng_live->sessions, node) { | |
1033 | lttng_live_destroy_session(session); | |
1034 | } | |
1035 | BT_PUT(lttng_live->viewer_connection); | |
1036 | if (lttng_live->url) { | |
1037 | g_string_free(lttng_live->url, TRUE); | |
1038 | } | |
1039 | if (lttng_live->no_stream_port) { | |
6f79a7cf | 1040 | bt_get(lttng_live->no_stream_port); |
7cdc2bab | 1041 | ret = bt_private_port_remove_from_component(lttng_live->no_stream_port); |
6f79a7cf | 1042 | bt_put(lttng_live->no_stream_port); |
f6ccaed9 | 1043 | BT_ASSERT(!ret); |
7cdc2bab MD |
1044 | } |
1045 | if (lttng_live->no_stream_iter) { | |
1046 | g_free(lttng_live->no_stream_iter); | |
1047 | } | |
1048 | g_free(lttng_live); | |
1049 | } | |
1050 | ||
1051 | BT_HIDDEN | |
1052 | void lttng_live_component_finalize(struct bt_private_component *component) | |
1053 | { | |
1054 | void *data = bt_private_component_get_user_data(component); | |
1055 | ||
1056 | if (!data) { | |
1057 | return; | |
1058 | } | |
1059 | lttng_live_component_destroy_data(data); | |
1060 | } | |
1061 | ||
1062 | static | |
1063 | struct lttng_live_component *lttng_live_component_create(struct bt_value *params, | |
6f79a7cf | 1064 | struct bt_private_component *private_component) |
7cdc2bab MD |
1065 | { |
1066 | struct lttng_live_component *lttng_live; | |
1067 | struct bt_value *value = NULL; | |
1068 | const char *url; | |
1069 | enum bt_value_status ret; | |
1070 | ||
1071 | lttng_live = g_new0(struct lttng_live_component, 1); | |
1072 | if (!lttng_live) { | |
1073 | goto end; | |
1074 | } | |
7cdc2bab MD |
1075 | /* TODO: make this an overridable parameter. */ |
1076 | lttng_live->max_query_size = MAX_QUERY_SIZE; | |
1077 | BT_INIT_LIST_HEAD(<tng_live->sessions); | |
1078 | value = bt_value_map_get(params, "url"); | |
1079 | if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) { | |
087bc060 | 1080 | BT_LOGW("Mandatory \"url\" parameter missing"); |
7cdc2bab MD |
1081 | goto error; |
1082 | } | |
1083 | ret = bt_value_string_get(value, &url); | |
1084 | if (ret != BT_VALUE_STATUS_OK) { | |
087bc060 | 1085 | BT_LOGW("\"url\" parameter is required to be a string value"); |
7cdc2bab MD |
1086 | goto error; |
1087 | } | |
1088 | lttng_live->url = g_string_new(url); | |
1089 | if (!lttng_live->url) { | |
1090 | goto error; | |
1091 | } | |
6f79a7cf | 1092 | BT_PUT(value); |
7cdc2bab | 1093 | lttng_live->viewer_connection = |
4c66436f | 1094 | bt_live_viewer_connection_create(lttng_live->url->str, lttng_live); |
7cdc2bab | 1095 | if (!lttng_live->viewer_connection) { |
7cdc2bab MD |
1096 | goto error; |
1097 | } | |
1098 | if (lttng_live_create_viewer_session(lttng_live)) { | |
7cdc2bab MD |
1099 | goto error; |
1100 | } | |
1101 | lttng_live->private_component = private_component; | |
4c66436f | 1102 | |
7cdc2bab MD |
1103 | goto end; |
1104 | ||
1105 | error: | |
1106 | lttng_live_component_destroy_data(lttng_live); | |
1107 | lttng_live = NULL; | |
1108 | end: | |
1109 | return lttng_live; | |
d3e4dcd8 PP |
1110 | } |
1111 | ||
f3bc2010 | 1112 | BT_HIDDEN |
3cdf4234 MD |
1113 | enum bt_component_status lttng_live_component_init( |
1114 | struct bt_private_component *private_component, | |
7cdc2bab | 1115 | struct bt_value *params, void *init_method_data) |
f3bc2010 | 1116 | { |
7cdc2bab MD |
1117 | struct lttng_live_component *lttng_live; |
1118 | enum bt_component_status ret = BT_COMPONENT_STATUS_OK; | |
1119 | ||
7cdc2bab | 1120 | /* Passes ownership of iter ref to lttng_live_component_create. */ |
6f79a7cf | 1121 | lttng_live = lttng_live_component_create(params, private_component); |
7cdc2bab | 1122 | if (!lttng_live) { |
6f79a7cf MD |
1123 | //TODO : we need access to the application cancel state |
1124 | //because we are not part of a graph yet. | |
1125 | ret = BT_COMPONENT_STATUS_NOMEM; | |
7cdc2bab MD |
1126 | goto end; |
1127 | } | |
1128 | ||
1129 | lttng_live->no_stream_iter = g_new0(struct lttng_live_no_stream_iterator, 1); | |
1130 | lttng_live->no_stream_iter->p.type = LIVE_STREAM_TYPE_NO_STREAM; | |
1131 | lttng_live->no_stream_iter->lttng_live = lttng_live; | |
4bf0e537 MD |
1132 | if (lttng_live_is_canceled(lttng_live)) { |
1133 | goto end; | |
1134 | } | |
147337a3 | 1135 | ret = bt_private_component_source_add_output_private_port( |
7cdc2bab | 1136 | lttng_live->private_component, "no-stream", |
147337a3 PP |
1137 | lttng_live->no_stream_iter, |
1138 | <tng_live->no_stream_port); | |
1139 | if (ret != BT_COMPONENT_STATUS_OK) { | |
1140 | goto end; | |
1141 | } | |
6f79a7cf | 1142 | bt_put(lttng_live->no_stream_port); /* weak */ |
7cdc2bab MD |
1143 | lttng_live->no_stream_iter->port = lttng_live->no_stream_port; |
1144 | ||
3cdf4234 | 1145 | ret = bt_private_component_set_user_data(private_component, lttng_live); |
7cdc2bab MD |
1146 | if (ret != BT_COMPONENT_STATUS_OK) { |
1147 | goto error; | |
1148 | } | |
1149 | ||
1150 | end: | |
1151 | return ret; | |
1152 | error: | |
3cdf4234 | 1153 | (void) bt_private_component_set_user_data(private_component, NULL); |
7cdc2bab MD |
1154 | lttng_live_component_destroy_data(lttng_live); |
1155 | return ret; | |
f3bc2010 | 1156 | } |
087bc060 | 1157 | |
d85ef162 MD |
1158 | BT_HIDDEN |
1159 | enum bt_component_status lttng_live_accept_port_connection( | |
1160 | struct bt_private_component *private_component, | |
1161 | struct bt_private_port *self_private_port, | |
1162 | struct bt_port *other_port) | |
1163 | { | |
1164 | struct lttng_live_component *lttng_live = | |
1165 | bt_private_component_get_user_data(private_component); | |
1166 | struct bt_component *other_component; | |
1167 | enum bt_component_status status = BT_COMPONENT_STATUS_OK; | |
6d137876 | 1168 | struct bt_port *self_port = bt_port_from_private(self_private_port); |
d85ef162 MD |
1169 | |
1170 | other_component = bt_port_get_component(other_port); | |
1171 | bt_put(other_component); /* weak */ | |
1172 | ||
1173 | if (!lttng_live->downstream_component) { | |
1174 | lttng_live->downstream_component = other_component; | |
1175 | goto end; | |
1176 | } | |
1177 | ||
1178 | /* | |
1179 | * Compare prior component to ensure we are connected to the | |
1180 | * same downstream component as prior ports. | |
1181 | */ | |
1182 | if (lttng_live->downstream_component != other_component) { | |
1183 | BT_LOGW("Cannot connect ctf.lttng-live component port \"%s\" to component \"%s\": already connected to component \"%s\".", | |
1184 | bt_port_get_name(self_port), | |
1185 | bt_component_get_name(other_component), | |
1186 | bt_component_get_name(lttng_live->downstream_component)); | |
1187 | status = BT_COMPONENT_STATUS_REFUSE_PORT_CONNECTION; | |
1188 | goto end; | |
1189 | } | |
1190 | end: | |
1191 | bt_put(self_port); | |
1192 | return status; | |
1193 | } |