relayd: replace uses of block FDs by the fs_handle interface
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #define _LGPL_SOURCE
22 #include <common/common.h>
23 #include <common/defaults.h>
24 #include <common/fs-handle.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <common/utils.h>
27 #include <sys/stat.h>
28 #include <urcu/rculist.h>
29
30 #include "lttng-relayd.h"
31 #include "index.h"
32 #include "stream.h"
33 #include "viewer-stream.h"
34
35 #include <sys/types.h>
36 #include <fcntl.h>
37
38 #define FILE_IO_STACK_BUFFER_SIZE 65536
39
40 /* Should be called with RCU read-side lock held. */
41 bool stream_get(struct relay_stream *stream)
42 {
43 return urcu_ref_get_unless_zero(&stream->ref);
44 }
45
46 /*
47 * Get stream from stream id from the streams hash table. Return stream
48 * if found else NULL. A stream reference is taken when a stream is
49 * returned. stream_put() must be called on that stream.
50 */
51 struct relay_stream *stream_get_by_id(uint64_t stream_id)
52 {
53 struct lttng_ht_node_u64 *node;
54 struct lttng_ht_iter iter;
55 struct relay_stream *stream = NULL;
56
57 rcu_read_lock();
58 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
59 node = lttng_ht_iter_get_node_u64(&iter);
60 if (!node) {
61 DBG("Relay stream %" PRIu64 " not found", stream_id);
62 goto end;
63 }
64 stream = caa_container_of(node, struct relay_stream, node);
65 if (!stream_get(stream)) {
66 stream = NULL;
67 }
68 end:
69 rcu_read_unlock();
70 return stream;
71 }
72
73 static void stream_complete_rotation(struct relay_stream *stream)
74 {
75 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
76 if (stream->ongoing_rotation.value.next_trace_chunk) {
77 tracefile_array_reset(stream->tfa);
78 tracefile_array_commit_seq(stream->tfa,
79 stream->index_received_seqcount);
80 }
81 lttng_trace_chunk_put(stream->trace_chunk);
82 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
83 stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
84 }
85
86 static int stream_create_data_output_file_from_trace_chunk(
87 struct relay_stream *stream,
88 struct lttng_trace_chunk *trace_chunk,
89 bool force_unlink,
90 struct fs_handle **out_file)
91 {
92 int ret;
93 char stream_path[LTTNG_PATH_MAX];
94 enum lttng_trace_chunk_status status;
95 const int flags = O_RDWR | O_CREAT | O_TRUNC;
96 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
97
98 ASSERT_LOCKED(stream->lock);
99
100 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
101 stream->tracefile_size, stream->tracefile_current_index,
102 NULL, stream_path, sizeof(stream_path));
103 if (ret < 0) {
104 goto end;
105 }
106
107 if (stream->tracefile_wrapped_around || force_unlink) {
108 /*
109 * The on-disk ring-buffer has wrapped around.
110 * Newly created stream files will replace existing files. Since
111 * live clients may be consuming existing files, the file about
112 * to be replaced is unlinked in order to not overwrite its
113 * content.
114 */
115 status = lttng_trace_chunk_unlink_file(trace_chunk,
116 stream_path);
117 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
118 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
119 stream_path);
120 /*
121 * Don't abort if the file doesn't exist, it is
122 * unexpected, but should not be a fatal error.
123 */
124 if (errno != ENOENT) {
125 ret = -1;
126 goto end;
127 }
128 }
129 }
130
131 status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
132 flags, mode, out_file, false);
133 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
134 ERR("Failed to open stream file \"%s\"", stream->channel_name);
135 ret = -1;
136 goto end;
137 }
138 end:
139 return ret;
140 }
141
142 static int stream_rotate_data_file(struct relay_stream *stream)
143 {
144 int ret = 0;
145
146 DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
147 stream->stream_handle, stream->tracefile_size_current);
148
149 if (stream->file) {
150 fs_handle_close(stream->file);
151 stream->file = NULL;
152 }
153
154 stream->tracefile_wrapped_around = false;
155 stream->tracefile_current_index = 0;
156
157 if (stream->ongoing_rotation.value.next_trace_chunk) {
158 enum lttng_trace_chunk_status chunk_status;
159
160 chunk_status = lttng_trace_chunk_create_subdirectory(
161 stream->ongoing_rotation.value.next_trace_chunk,
162 stream->path_name);
163 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
164 ret = -1;
165 goto end;
166 }
167
168 /* Rotate the data file. */
169 ret = stream_create_data_output_file_from_trace_chunk(stream,
170 stream->ongoing_rotation.value.next_trace_chunk,
171 false, &stream->file);
172 if (ret < 0) {
173 ERR("Failed to rotate stream data file");
174 goto end;
175 }
176 }
177 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
178 __func__, stream->stream_handle, stream->tracefile_size_current);
179 stream->tracefile_size_current = 0;
180 stream->pos_after_last_complete_data_index = 0;
181 stream->ongoing_rotation.value.data_rotated = true;
182
183 if (stream->ongoing_rotation.value.index_rotated) {
184 /* Rotation completed; reset its state. */
185 stream_complete_rotation(stream);
186 }
187 end:
188 return ret;
189 }
190
191 /*
192 * If too much data has been written in a tracefile before we received the
193 * rotation command, we have to move the excess data to the new tracefile and
194 * perform the rotation. This can happen because the control and data
195 * connections are separate, the indexes as well as the commands arrive from
196 * the control connection and we have no control over the order so we could be
197 * in a situation where too much data has been received on the data connection
198 * before the rotation command on the control connection arrives.
199 */
200 static int rotate_truncate_stream(struct relay_stream *stream)
201 {
202 int ret;
203 off_t lseek_ret, previous_stream_copy_origin;
204 uint64_t copy_bytes_left, misplaced_data_size;
205 bool acquired_reference;
206 struct fs_handle *previous_stream_file = NULL;
207 struct lttng_trace_chunk *previous_chunk = NULL;
208
209 if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
210 ERR("Protocol error encoutered in %s(): stream rotation "
211 "sequence number is before the current sequence number "
212 "and the next trace chunk is unset. Honoring this "
213 "rotation command would result in data loss",
214 __FUNCTION__);
215 ret = -1;
216 goto end;
217 }
218
219 ASSERT_LOCKED(stream->lock);
220 /*
221 * Acquire a reference to the current trace chunk to ensure
222 * it is not reclaimed when `stream_rotate_data_file` is called.
223 * Failing to do so would violate the contract of the trace
224 * chunk API as an active file descriptor would outlive the
225 * trace chunk.
226 */
227 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
228 assert(acquired_reference);
229 previous_chunk = stream->trace_chunk;
230
231 /*
232 * Steal the stream's reference to its stream_fd. A new
233 * stream_fd will be created when the rotation completes and
234 * the orinal stream_fd will be used to copy the "extra" data
235 * to the new file.
236 */
237 assert(stream->file);
238 previous_stream_file = stream->file;
239 stream->file = NULL;
240
241 assert(!stream->is_metadata);
242 assert(stream->tracefile_size_current >
243 stream->pos_after_last_complete_data_index);
244 misplaced_data_size = stream->tracefile_size_current -
245 stream->pos_after_last_complete_data_index;
246 copy_bytes_left = misplaced_data_size;
247 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
248
249 ret = stream_rotate_data_file(stream);
250 if (ret) {
251 goto end;
252 }
253
254 assert(stream->file);
255 /*
256 * Seek the current tracefile to the position at which the rotation
257 * should have occurred.
258 */
259 lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
260 if (lseek_ret < 0) {
261 PERROR("Failed to seek to offset %" PRIu64
262 " while copying extra data received before a stream rotation",
263 (uint64_t) previous_stream_copy_origin);
264 ret = -1;
265 goto end;
266 }
267
268 /* Move data from the old file to the new file. */
269 while (copy_bytes_left) {
270 ssize_t io_ret;
271 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
272 const off_t copy_size_this_pass = min_t(
273 off_t, copy_bytes_left, sizeof(copy_buffer));
274
275 io_ret = fs_handle_read(previous_stream_file, copy_buffer,
276 copy_size_this_pass);
277 if (io_ret < (ssize_t) copy_size_this_pass) {
278 if (io_ret == -1) {
279 PERROR("Failed to read %" PRIu64
280 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
281 copy_size_this_pass,
282 __FUNCTION__, io_ret,
283 stream->stream_handle);
284 } else {
285 ERR("Failed to read %" PRIu64
286 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
287 copy_size_this_pass,
288 __FUNCTION__, io_ret,
289 stream->stream_handle);
290 }
291 ret = -1;
292 goto end;
293 }
294
295 io_ret = fs_handle_write(
296 stream->file, copy_buffer, copy_size_this_pass);
297 if (io_ret < (ssize_t) copy_size_this_pass) {
298 if (io_ret == -1) {
299 PERROR("Failed to write %" PRIu64
300 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
301 copy_size_this_pass,
302 __FUNCTION__, io_ret,
303 stream->stream_handle);
304 } else {
305 ERR("Failed to write %" PRIu64
306 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
307 copy_size_this_pass,
308 __FUNCTION__, io_ret,
309 stream->stream_handle);
310 }
311 ret = -1;
312 goto end;
313 }
314 copy_bytes_left -= copy_size_this_pass;
315 }
316
317 /* Truncate the file to get rid of the excess data. */
318 ret = fs_handle_truncate(
319 previous_stream_file, previous_stream_copy_origin);
320 if (ret) {
321 PERROR("Failed to truncate current stream file to offset %" PRIu64,
322 previous_stream_copy_origin);
323 goto end;
324 }
325
326 /*
327 * Update the offset and FD of all the eventual indexes created by the
328 * data connection before the rotation command arrived.
329 */
330 ret = relay_index_switch_all_files(stream);
331 if (ret < 0) {
332 ERR("Failed to rotate index file");
333 goto end;
334 }
335
336 stream->tracefile_size_current = misplaced_data_size;
337 /* Index and data contents are back in sync. */
338 stream->pos_after_last_complete_data_index = 0;
339 ret = 0;
340 end:
341 lttng_trace_chunk_put(previous_chunk);
342 return ret;
343 }
344
345 /*
346 * Check if a stream's data file (as opposed to index) should be rotated
347 * (for session rotation).
348 * Must be called with the stream lock held.
349 *
350 * Return 0 on success, a negative value on error.
351 */
352 static int try_rotate_stream_data(struct relay_stream *stream)
353 {
354 int ret = 0;
355
356 if (caa_likely(!stream->ongoing_rotation.is_set)) {
357 /* No rotation expected. */
358 goto end;
359 }
360
361 if (stream->ongoing_rotation.value.data_rotated) {
362 /* Rotation of the data file has already occurred. */
363 goto end;
364 }
365
366 DBG("%s: Stream %" PRIu64
367 " (rotate_at_index_packet_seq_num = %" PRIu64
368 ", rotate_at_prev_data_net_seq = %" PRIu64
369 ", prev_data_seq = %" PRIu64 ")",
370 __func__, stream->stream_handle,
371 stream->ongoing_rotation.value.packet_seq_num,
372 stream->ongoing_rotation.value.prev_data_net_seq,
373 stream->prev_data_seq);
374
375 if (stream->prev_data_seq == -1ULL ||
376 stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
377 stream->prev_data_seq <
378 stream->ongoing_rotation.value.prev_data_net_seq) {
379 /*
380 * The next packet that will be written is not part of the next
381 * chunk yet.
382 */
383 DBG("Stream %" PRIu64 " data not yet ready for rotation "
384 "(rotate_at_index_packet_seq_num = %" PRIu64
385 ", rotate_at_prev_data_net_seq = %" PRIu64
386 ", prev_data_seq = %" PRIu64 ")",
387 stream->stream_handle,
388 stream->ongoing_rotation.value.packet_seq_num,
389 stream->ongoing_rotation.value.prev_data_net_seq,
390 stream->prev_data_seq);
391 goto end;
392 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
393 /*
394 * prev_data_seq is checked here since indexes and rotation
395 * commands are serialized with respect to each other.
396 */
397 DBG("Rotation after too much data has been written in tracefile "
398 "for stream %" PRIu64 ", need to truncate before "
399 "rotating", stream->stream_handle);
400 ret = rotate_truncate_stream(stream);
401 if (ret) {
402 ERR("Failed to truncate stream");
403 goto end;
404 }
405 } else {
406 ret = stream_rotate_data_file(stream);
407 }
408
409 end:
410 return ret;
411 }
412
413 /*
414 * Close the current index file if it is open, and create a new one.
415 *
416 * Return 0 on success, -1 on error.
417 */
418 static int create_index_file(struct relay_stream *stream,
419 struct lttng_trace_chunk *chunk)
420 {
421 int ret;
422 uint32_t major, minor;
423 char *index_subpath = NULL;
424 enum lttng_trace_chunk_status status;
425
426 ASSERT_LOCKED(stream->lock);
427
428 /* Put ref on previous index_file. */
429 if (stream->index_file) {
430 lttng_index_file_put(stream->index_file);
431 stream->index_file = NULL;
432 }
433 major = stream->trace->session->major;
434 minor = stream->trace->session->minor;
435
436 if (!chunk) {
437 ret = 0;
438 goto end;
439 }
440 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
441 DEFAULT_INDEX_DIR);
442 if (ret < 0) {
443 goto end;
444 }
445
446 status = lttng_trace_chunk_create_subdirectory(chunk,
447 index_subpath);
448 free(index_subpath);
449 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
450 ret = -1;
451 goto end;
452 }
453 status = lttng_index_file_create_from_trace_chunk(
454 chunk, stream->path_name,
455 stream->channel_name, stream->tracefile_size,
456 stream->tracefile_current_index,
457 lttng_to_index_major(major, minor),
458 lttng_to_index_minor(major, minor), true,
459 &stream->index_file);
460 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
461 ret = -1;
462 goto end;
463 }
464
465 ret = 0;
466
467 end:
468 return ret;
469 }
470
471 /*
472 * Check if a stream's index file should be rotated (for session rotation).
473 * Must be called with the stream lock held.
474 *
475 * Return 0 on success, a negative value on error.
476 */
477 static int try_rotate_stream_index(struct relay_stream *stream)
478 {
479 int ret = 0;
480
481 if (!stream->ongoing_rotation.is_set) {
482 /* No rotation expected. */
483 goto end;
484 }
485
486 if (stream->ongoing_rotation.value.index_rotated) {
487 /* Rotation of the index has already occurred. */
488 goto end;
489 }
490
491 DBG("%s: Stream %" PRIu64
492 " (rotate_at_packet_seq_num = %" PRIu64
493 ", received_packet_seq_num = "
494 "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
495 __func__, stream->stream_handle,
496 stream->ongoing_rotation.value.packet_seq_num,
497 stream->received_packet_seq_num.value,
498 stream->received_packet_seq_num.is_set);
499
500 if (!stream->received_packet_seq_num.is_set ||
501 LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
502 stream->ongoing_rotation.value.packet_seq_num) {
503 DBG("Stream %" PRIu64 " index not yet ready for rotation "
504 "(rotate_at_packet_seq_num = %" PRIu64
505 ", received_packet_seq_num = "
506 "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
507 stream->stream_handle,
508 stream->ongoing_rotation.value.packet_seq_num,
509 stream->received_packet_seq_num.value,
510 stream->received_packet_seq_num.is_set);
511 goto end;
512 } else {
513 /*
514 * The next index belongs to the new trace chunk; rotate.
515 * In overwrite mode, the packet seq num may jump over the
516 * rotation position.
517 */
518 assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
519 stream->ongoing_rotation.value.packet_seq_num);
520 DBG("Rotating stream %" PRIu64 " index file",
521 stream->stream_handle);
522 if (stream->index_file) {
523 lttng_index_file_put(stream->index_file);
524 stream->index_file = NULL;
525 }
526 stream->ongoing_rotation.value.index_rotated = true;
527
528 /*
529 * Set the rotation pivot position for the data, now that we have the
530 * net_seq_num matching the packet_seq_num index pivot position.
531 */
532 stream->ongoing_rotation.value.prev_data_net_seq =
533 stream->prev_index_seq;
534 if (stream->ongoing_rotation.value.data_rotated &&
535 stream->ongoing_rotation.value.index_rotated) {
536 /* Rotation completed; reset its state. */
537 DBG("Rotation completed for stream %" PRIu64,
538 stream->stream_handle);
539 stream_complete_rotation(stream);
540 }
541 }
542
543 end:
544 return ret;
545 }
546
547 static int stream_set_trace_chunk(struct relay_stream *stream,
548 struct lttng_trace_chunk *chunk)
549 {
550 int ret = 0;
551 enum lttng_trace_chunk_status status;
552 bool acquired_reference;
553
554 status = lttng_trace_chunk_create_subdirectory(chunk,
555 stream->path_name);
556 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
557 ret = -1;
558 goto end;
559 }
560
561 lttng_trace_chunk_put(stream->trace_chunk);
562 acquired_reference = lttng_trace_chunk_get(chunk);
563 assert(acquired_reference);
564 stream->trace_chunk = chunk;
565
566 if (stream->file) {
567 fs_handle_close(stream->file);
568 stream->file = NULL;
569 }
570 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
571 false, &stream->file);
572 end:
573 return ret;
574 }
575
576 /*
577 * We keep ownership of path_name and channel_name.
578 */
579 struct relay_stream *stream_create(struct ctf_trace *trace,
580 uint64_t stream_handle, char *path_name,
581 char *channel_name, uint64_t tracefile_size,
582 uint64_t tracefile_count)
583 {
584 int ret;
585 struct relay_stream *stream = NULL;
586 struct relay_session *session = trace->session;
587 bool acquired_reference = false;
588 struct lttng_trace_chunk *current_trace_chunk;
589
590 stream = zmalloc(sizeof(struct relay_stream));
591 if (stream == NULL) {
592 PERROR("relay stream zmalloc");
593 goto error_no_alloc;
594 }
595
596 stream->stream_handle = stream_handle;
597 stream->prev_data_seq = -1ULL;
598 stream->prev_index_seq = -1ULL;
599 stream->last_net_seq_num = -1ULL;
600 stream->ctf_stream_id = -1ULL;
601 stream->tracefile_size = tracefile_size;
602 stream->tracefile_count = tracefile_count;
603 stream->path_name = path_name;
604 stream->channel_name = channel_name;
605 stream->beacon_ts_end = -1ULL;
606 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
607 pthread_mutex_init(&stream->lock, NULL);
608 urcu_ref_init(&stream->ref);
609 ctf_trace_get(trace);
610 stream->trace = trace;
611
612 pthread_mutex_lock(&trace->session->lock);
613 current_trace_chunk = trace->session->current_trace_chunk;
614 if (current_trace_chunk) {
615 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
616 }
617 pthread_mutex_unlock(&trace->session->lock);
618 if (!acquired_reference) {
619 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
620 channel_name);
621 ret = -1;
622 goto end;
623 }
624
625 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
626 if (!stream->indexes_ht) {
627 ERR("Cannot created indexes_ht");
628 ret = -1;
629 goto end;
630 }
631
632 pthread_mutex_lock(&stream->lock);
633 ret = stream_set_trace_chunk(stream, current_trace_chunk);
634 pthread_mutex_unlock(&stream->lock);
635 if (ret) {
636 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
637 trace->session->session_name,
638 stream->channel_name);
639 ret = -1;
640 goto end;
641 }
642 stream->tfa = tracefile_array_create(stream->tracefile_count);
643 if (!stream->tfa) {
644 ret = -1;
645 goto end;
646 }
647
648 stream->is_metadata = !strcmp(stream->channel_name,
649 DEFAULT_METADATA_NAME);
650 stream->in_recv_list = true;
651
652 /*
653 * Add the stream in the recv list of the session. Once the end stream
654 * message is received, all session streams are published.
655 */
656 pthread_mutex_lock(&session->recv_list_lock);
657 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
658 session->stream_count++;
659 pthread_mutex_unlock(&session->recv_list_lock);
660
661 /*
662 * Both in the ctf_trace object and the global stream ht since the data
663 * side of the relayd does not have the concept of session.
664 */
665 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
666 stream->in_stream_ht = true;
667
668 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
669 stream->stream_handle);
670 ret = 0;
671
672 end:
673 if (ret) {
674 if (stream->file) {
675 fs_handle_close(stream->file);
676 stream->file = NULL;
677 }
678 stream_put(stream);
679 stream = NULL;
680 }
681 if (acquired_reference) {
682 lttng_trace_chunk_put(current_trace_chunk);
683 }
684 return stream;
685
686 error_no_alloc:
687 /*
688 * path_name and channel_name need to be freed explicitly here
689 * because we cannot rely on stream_put().
690 */
691 free(path_name);
692 free(channel_name);
693 return NULL;
694 }
695
696 /*
697 * Called with the session lock held.
698 */
699 void stream_publish(struct relay_stream *stream)
700 {
701 struct relay_session *session;
702
703 pthread_mutex_lock(&stream->lock);
704 if (stream->published) {
705 goto unlock;
706 }
707
708 session = stream->trace->session;
709
710 pthread_mutex_lock(&session->recv_list_lock);
711 if (stream->in_recv_list) {
712 cds_list_del_rcu(&stream->recv_node);
713 stream->in_recv_list = false;
714 }
715 pthread_mutex_unlock(&session->recv_list_lock);
716
717 pthread_mutex_lock(&stream->trace->stream_list_lock);
718 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
719 pthread_mutex_unlock(&stream->trace->stream_list_lock);
720
721 stream->published = true;
722 unlock:
723 pthread_mutex_unlock(&stream->lock);
724 }
725
726 /*
727 * Stream must be protected by holding the stream lock or by virtue of being
728 * called from stream_destroy.
729 */
730 static void stream_unpublish(struct relay_stream *stream)
731 {
732 if (stream->in_stream_ht) {
733 struct lttng_ht_iter iter;
734 int ret;
735
736 iter.iter.node = &stream->node.node;
737 ret = lttng_ht_del(relay_streams_ht, &iter);
738 assert(!ret);
739 stream->in_stream_ht = false;
740 }
741 if (stream->published) {
742 pthread_mutex_lock(&stream->trace->stream_list_lock);
743 cds_list_del_rcu(&stream->stream_node);
744 pthread_mutex_unlock(&stream->trace->stream_list_lock);
745 stream->published = false;
746 }
747 }
748
749 static void stream_destroy(struct relay_stream *stream)
750 {
751 if (stream->indexes_ht) {
752 /*
753 * Calling lttng_ht_destroy in call_rcu worker thread so
754 * we don't hold the RCU read-side lock while calling
755 * it.
756 */
757 lttng_ht_destroy(stream->indexes_ht);
758 }
759 if (stream->tfa) {
760 tracefile_array_destroy(stream->tfa);
761 }
762 free(stream->path_name);
763 free(stream->channel_name);
764 free(stream);
765 }
766
767 static void stream_destroy_rcu(struct rcu_head *rcu_head)
768 {
769 struct relay_stream *stream =
770 caa_container_of(rcu_head, struct relay_stream, rcu_node);
771
772 stream_destroy(stream);
773 }
774
775 /*
776 * No need to take stream->lock since this is only called on the final
777 * stream_put which ensures that a single thread may act on the stream.
778 */
779 static void stream_release(struct urcu_ref *ref)
780 {
781 struct relay_stream *stream =
782 caa_container_of(ref, struct relay_stream, ref);
783 struct relay_session *session;
784
785 session = stream->trace->session;
786
787 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
788
789 pthread_mutex_lock(&session->recv_list_lock);
790 session->stream_count--;
791 if (stream->in_recv_list) {
792 cds_list_del_rcu(&stream->recv_node);
793 stream->in_recv_list = false;
794 }
795 pthread_mutex_unlock(&session->recv_list_lock);
796
797 stream_unpublish(stream);
798
799 if (stream->file) {
800 fs_handle_close(stream->file);
801 stream->file = NULL;
802 }
803 if (stream->index_file) {
804 lttng_index_file_put(stream->index_file);
805 stream->index_file = NULL;
806 }
807 if (stream->trace) {
808 ctf_trace_put(stream->trace);
809 stream->trace = NULL;
810 }
811 stream_complete_rotation(stream);
812 lttng_trace_chunk_put(stream->trace_chunk);
813 stream->trace_chunk = NULL;
814
815 call_rcu(&stream->rcu_node, stream_destroy_rcu);
816 }
817
818 void stream_put(struct relay_stream *stream)
819 {
820 rcu_read_lock();
821 assert(stream->ref.refcount != 0);
822 /*
823 * Wait until we have processed all the stream packets before
824 * actually putting our last stream reference.
825 */
826 urcu_ref_put(&stream->ref, stream_release);
827 rcu_read_unlock();
828 }
829
830 int stream_set_pending_rotation(struct relay_stream *stream,
831 struct lttng_trace_chunk *next_trace_chunk,
832 uint64_t rotation_sequence_number)
833 {
834 int ret = 0;
835 const struct relay_stream_rotation rotation = {
836 .data_rotated = false,
837 .index_rotated = false,
838 .packet_seq_num = rotation_sequence_number,
839 .prev_data_net_seq = -1ULL,
840 .next_trace_chunk = next_trace_chunk,
841 };
842
843 if (stream->ongoing_rotation.is_set) {
844 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
845 ret = -1;
846 goto end;
847 }
848
849 if (next_trace_chunk) {
850 const bool reference_acquired =
851 lttng_trace_chunk_get(next_trace_chunk);
852
853 assert(reference_acquired);
854 }
855 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
856
857 DBG("Setting pending rotation: stream_id = %" PRIu64
858 ", rotate_at_packet_seq_num = %" PRIu64,
859 stream->stream_handle, rotation_sequence_number);
860 if (stream->is_metadata) {
861 /*
862 * A metadata stream has no index; consider it already rotated.
863 */
864 stream->ongoing_rotation.value.index_rotated = true;
865 if (next_trace_chunk) {
866 /*
867 * The metadata will be received again in the new chunk.
868 */
869 stream->metadata_received = 0;
870 }
871 ret = stream_rotate_data_file(stream);
872 } else {
873 ret = try_rotate_stream_index(stream);
874 if (ret < 0) {
875 goto end;
876 }
877
878 ret = try_rotate_stream_data(stream);
879 if (ret < 0) {
880 goto end;
881 }
882 }
883 end:
884 return ret;
885 }
886
887 void try_stream_close(struct relay_stream *stream)
888 {
889 bool session_aborted;
890 struct relay_session *session = stream->trace->session;
891
892 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
893
894 pthread_mutex_lock(&session->lock);
895 session_aborted = session->aborted;
896 pthread_mutex_unlock(&session->lock);
897
898 pthread_mutex_lock(&stream->lock);
899 /*
900 * Can be called concurently by connection close and reception of last
901 * pending data.
902 */
903 if (stream->closed) {
904 pthread_mutex_unlock(&stream->lock);
905 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
906 return;
907 }
908
909 stream->close_requested = true;
910
911 if (stream->last_net_seq_num == -1ULL) {
912 /*
913 * Handle connection close without explicit stream close
914 * command.
915 *
916 * We can be clever about indexes partially received in
917 * cases where we received the data socket part, but not
918 * the control socket part: since we're currently closing
919 * the stream on behalf of the control socket, we *know*
920 * there won't be any more control information for this
921 * socket. Therefore, we can destroy all indexes for
922 * which we have received only the file descriptor (from
923 * data socket). This takes care of consumerd crashes
924 * between sending the data and control information for
925 * a packet. Since those are sent in that order, we take
926 * care of consumerd crashes.
927 */
928 DBG("relay_index_close_partial_fd");
929 relay_index_close_partial_fd(stream);
930 /*
931 * Use the highest net_seq_num we currently have pending
932 * As end of stream indicator. Leave last_net_seq_num
933 * at -1ULL if we cannot find any index.
934 */
935 stream->last_net_seq_num = relay_index_find_last(stream);
936 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
937 /* Fall-through into the next check. */
938 }
939
940 if (stream->last_net_seq_num != -1ULL &&
941 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
942 && !session_aborted) {
943 /*
944 * Don't close since we still have data pending. This
945 * handles cases where an explicit close command has
946 * been received for this stream, and cases where the
947 * connection has been closed, and we are awaiting for
948 * index information from the data socket. It is
949 * therefore expected that all the index fd information
950 * we need has already been received on the control
951 * socket. Matching index information from data socket
952 * should be Expected Soon(TM).
953 *
954 * TODO: We should implement a timer to garbage collect
955 * streams after a timeout to be resilient against a
956 * consumerd implementation that would not match this
957 * expected behavior.
958 */
959 pthread_mutex_unlock(&stream->lock);
960 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
961 return;
962 }
963 /*
964 * We received all the indexes we can expect.
965 */
966 stream_unpublish(stream);
967 stream->closed = true;
968 /* Relay indexes are only used by the "consumer/sessiond" end. */
969 relay_index_close_all(stream);
970
971 /*
972 * If we are closed by an application exiting (per-pid buffers),
973 * we need to put our reference on the stream trace chunk right
974 * away, because otherwise still holding the reference on the
975 * trace chunk could allow a viewer stream (which holds a reference
976 * to the stream) to postpone destroy waiting for the chunk to cease
977 * to exist endlessly until the viewer is detached.
978 */
979
980 /* Put stream fd before put chunk. */
981 if (stream->file) {
982 fs_handle_close(stream->file);
983 stream->file = NULL;
984 }
985 if (stream->index_file) {
986 lttng_index_file_put(stream->index_file);
987 stream->index_file = NULL;
988 }
989 lttng_trace_chunk_put(stream->trace_chunk);
990 stream->trace_chunk = NULL;
991 pthread_mutex_unlock(&stream->lock);
992 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
993 stream_put(stream);
994 }
995
996 int stream_init_packet(struct relay_stream *stream, size_t packet_size,
997 bool *file_rotated)
998 {
999 int ret = 0;
1000
1001 ASSERT_LOCKED(stream->lock);
1002
1003 if (!stream->file || !stream->trace_chunk) {
1004 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
1005 stream->stream_handle, stream->channel_name);
1006 ret = -1;
1007 goto end;
1008 }
1009
1010 if (caa_likely(stream->tracefile_size == 0)) {
1011 /* No size limit set; nothing to check. */
1012 goto end;
1013 }
1014
1015 /*
1016 * Check if writing the new packet would exceed the maximal file size.
1017 */
1018 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
1019 stream->tracefile_size)) {
1020 const uint64_t new_file_index =
1021 (stream->tracefile_current_index + 1) %
1022 stream->tracefile_count;
1023
1024 if (new_file_index < stream->tracefile_current_index) {
1025 stream->tracefile_wrapped_around = true;
1026 }
1027 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
1028 ", current_file_size = %" PRIu64
1029 ", packet_size = %zu, current_file_index = %" PRIu64
1030 " new_file_index = %" PRIu64,
1031 stream->stream_handle,
1032 stream->tracefile_size_current, packet_size,
1033 stream->tracefile_current_index, new_file_index);
1034 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
1035 stream->tracefile_current_index = new_file_index;
1036
1037 if (stream->file) {
1038 fs_handle_close(stream->file);
1039 stream->file = NULL;
1040 }
1041 ret = stream_create_data_output_file_from_trace_chunk(stream,
1042 stream->trace_chunk, false, &stream->file);
1043 if (ret) {
1044 ERR("Failed to perform trace file rotation of stream %" PRIu64,
1045 stream->stream_handle);
1046 goto end;
1047 }
1048
1049 /*
1050 * Reset current size because we just performed a stream
1051 * rotation.
1052 */
1053 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
1054 __func__, stream->stream_handle, stream->tracefile_size_current);
1055 stream->tracefile_size_current = 0;
1056 *file_rotated = true;
1057 } else {
1058 *file_rotated = false;
1059 }
1060 end:
1061 return ret;
1062 }
1063
1064 /* Note that the packet is not necessarily complete. */
1065 int stream_write(struct relay_stream *stream,
1066 const struct lttng_buffer_view *packet, size_t padding_len)
1067 {
1068 int ret = 0;
1069 ssize_t write_ret;
1070 size_t padding_to_write = padding_len;
1071 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
1072
1073 ASSERT_LOCKED(stream->lock);
1074 memset(padding_buffer, 0,
1075 min(sizeof(padding_buffer), padding_to_write));
1076
1077 if (!stream->file || !stream->trace_chunk) {
1078 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
1079 stream->stream_handle, stream->channel_name);
1080 ret = -1;
1081 goto end;
1082 }
1083 if (packet) {
1084 write_ret = fs_handle_write(
1085 stream->file, packet->data, packet->size);
1086 if (write_ret != packet->size) {
1087 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1088 stream->is_metadata ? "metadata " : "",
1089 stream->stream_handle);
1090 ret = -1;
1091 goto end;
1092 }
1093 }
1094
1095 while (padding_to_write > 0) {
1096 const size_t padding_to_write_this_pass =
1097 min(padding_to_write, sizeof(padding_buffer));
1098
1099 write_ret = fs_handle_write(stream->file, padding_buffer,
1100 padding_to_write_this_pass);
1101 if (write_ret != padding_to_write_this_pass) {
1102 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1103 stream->is_metadata ? "metadata " : "",
1104 stream->stream_handle);
1105 ret = -1;
1106 goto end;
1107 }
1108 padding_to_write -= padding_to_write_this_pass;
1109 }
1110
1111 if (stream->is_metadata) {
1112 size_t recv_len;
1113
1114 recv_len = packet ? packet->size : 0;
1115 recv_len += padding_len;
1116 stream->metadata_received += recv_len;
1117 if (recv_len) {
1118 stream->no_new_metadata_notified = false;
1119 }
1120 }
1121
1122 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
1123 stream->is_metadata ? "metadata " : "",
1124 stream->stream_handle,
1125 packet ? packet->size : (size_t) 0, padding_len);
1126 end:
1127 return ret;
1128 }
1129
1130 /*
1131 * Update index after receiving a packet for a data stream.
1132 *
1133 * Called with the stream lock held.
1134 *
1135 * Return 0 on success else a negative value.
1136 */
1137 int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1138 bool rotate_index, bool *flushed, uint64_t total_size)
1139 {
1140 int ret = 0;
1141 uint64_t data_offset;
1142 struct relay_index *index;
1143
1144 assert(stream->trace_chunk);
1145 ASSERT_LOCKED(stream->lock);
1146 /* Get data offset because we are about to update the index. */
1147 data_offset = htobe64(stream->tracefile_size_current);
1148
1149 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1150 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1151
1152 /*
1153 * Lookup for an existing index for that stream id/sequence
1154 * number. If it exists, the control thread has already received the
1155 * data for it, thus we need to write it to disk.
1156 */
1157 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1158 if (!index) {
1159 ret = -1;
1160 goto end;
1161 }
1162
1163 if (rotate_index || !stream->index_file) {
1164 ret = create_index_file(stream, stream->trace_chunk);
1165 if (ret) {
1166 ERR("Failed to create index file for stream %" PRIu64,
1167 stream->stream_handle);
1168 /* Put self-ref for this index due to error. */
1169 relay_index_put(index);
1170 index = NULL;
1171 goto end;
1172 }
1173 }
1174
1175 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1176 ret = -1;
1177 /* Put self-ref for this index due to error. */
1178 relay_index_put(index);
1179 index = NULL;
1180 goto end;
1181 }
1182
1183 ret = relay_index_try_flush(index);
1184 if (ret == 0) {
1185 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1186 tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
1187 stream->index_received_seqcount++;
1188 LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
1189 be64toh(index->index_data.packet_seq_num));
1190 *flushed = true;
1191 } else if (ret > 0) {
1192 index->total_size = total_size;
1193 /* No flush. */
1194 ret = 0;
1195 } else {
1196 /*
1197 * ret < 0
1198 *
1199 * relay_index_try_flush is responsible for the self-reference
1200 * put of the index object on error.
1201 */
1202 ERR("relay_index_try_flush error %d", ret);
1203 ret = -1;
1204 }
1205 end:
1206 return ret;
1207 }
1208
1209 int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1210 uint64_t sequence_number, bool index_flushed)
1211 {
1212 int ret = 0;
1213
1214 ASSERT_LOCKED(stream->lock);
1215
1216 stream->tracefile_size_current += packet_total_size;
1217 if (index_flushed) {
1218 stream->pos_after_last_complete_data_index =
1219 stream->tracefile_size_current;
1220 stream->prev_index_seq = sequence_number;
1221 ret = try_rotate_stream_index(stream);
1222 if (ret < 0) {
1223 goto end;
1224 }
1225 }
1226
1227 stream->prev_data_seq = sequence_number;
1228 ret = try_rotate_stream_data(stream);
1229
1230 end:
1231 return ret;
1232 }
1233
1234 int stream_add_index(struct relay_stream *stream,
1235 const struct lttcomm_relayd_index *index_info)
1236 {
1237 int ret = 0;
1238 struct relay_index *index;
1239
1240 ASSERT_LOCKED(stream->lock);
1241
1242 DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
1243
1244 /* Live beacon handling */
1245 if (index_info->packet_size == 0) {
1246 DBG("Received live beacon for stream %" PRIu64,
1247 stream->stream_handle);
1248
1249 /*
1250 * Only flag a stream inactive when it has already
1251 * received data and no indexes are in flight.
1252 */
1253 if (stream->index_received_seqcount > 0
1254 && stream->indexes_in_flight == 0) {
1255 stream->beacon_ts_end = index_info->timestamp_end;
1256 }
1257 ret = 0;
1258 goto end;
1259 } else {
1260 stream->beacon_ts_end = -1ULL;
1261 }
1262
1263 if (stream->ctf_stream_id == -1ULL) {
1264 stream->ctf_stream_id = index_info->stream_id;
1265 }
1266
1267 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1268 if (!index) {
1269 ret = -1;
1270 ERR("Failed to get or create index %" PRIu64,
1271 index_info->net_seq_num);
1272 goto end;
1273 }
1274 if (relay_index_set_control_data(index, index_info,
1275 stream->trace->session->minor)) {
1276 ERR("set_index_control_data error");
1277 relay_index_put(index);
1278 ret = -1;
1279 goto end;
1280 }
1281 ret = relay_index_try_flush(index);
1282 if (ret == 0) {
1283 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1284 tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
1285 stream->index_received_seqcount++;
1286 stream->pos_after_last_complete_data_index += index->total_size;
1287 stream->prev_index_seq = index_info->net_seq_num;
1288 LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
1289 index_info->packet_seq_num);
1290
1291 ret = try_rotate_stream_index(stream);
1292 if (ret < 0) {
1293 goto end;
1294 }
1295 ret = try_rotate_stream_data(stream);
1296 if (ret < 0) {
1297 goto end;
1298 }
1299 } else if (ret > 0) {
1300 /* no flush. */
1301 ret = 0;
1302 } else {
1303 /*
1304 * ret < 0
1305 *
1306 * relay_index_try_flush is responsible for the self-reference
1307 * put of the index object on error.
1308 */
1309 ERR("relay_index_try_flush error %d", ret);
1310 ret = -1;
1311 }
1312 end:
1313 return ret;
1314 }
1315
1316 static void print_stream_indexes(struct relay_stream *stream)
1317 {
1318 struct lttng_ht_iter iter;
1319 struct relay_index *index;
1320
1321 rcu_read_lock();
1322 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1323 index_n.node) {
1324 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1325 " stream %" PRIu64 " trace %" PRIu64
1326 " session %" PRIu64,
1327 index,
1328 index->index_n.key,
1329 stream->ref.refcount,
1330 index->stream->stream_handle,
1331 index->stream->trace->id,
1332 index->stream->trace->session->id);
1333 }
1334 rcu_read_unlock();
1335 }
1336
1337 int stream_reset_file(struct relay_stream *stream)
1338 {
1339 ASSERT_LOCKED(stream->lock);
1340
1341 if (stream->file) {
1342 int ret;
1343
1344 ret = fs_handle_close(stream->file);
1345 if (ret) {
1346 ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
1347 stream->channel_name,
1348 stream->stream_handle);
1349 }
1350 stream->file = NULL;
1351 }
1352
1353 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
1354 __func__, stream->stream_handle, stream->tracefile_size_current);
1355 stream->tracefile_size_current = 0;
1356 stream->prev_data_seq = 0;
1357 stream->prev_index_seq = 0;
1358 /* Note that this does not reset the tracefile array. */
1359 stream->tracefile_current_index = 0;
1360 stream->pos_after_last_complete_data_index = 0;
1361
1362 return stream_create_data_output_file_from_trace_chunk(stream,
1363 stream->trace_chunk, true, &stream->file);
1364 }
1365
1366 void print_relay_streams(void)
1367 {
1368 struct lttng_ht_iter iter;
1369 struct relay_stream *stream;
1370
1371 if (!relay_streams_ht) {
1372 return;
1373 }
1374
1375 rcu_read_lock();
1376 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1377 node.node) {
1378 if (!stream_get(stream)) {
1379 continue;
1380 }
1381 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1382 " session %" PRIu64,
1383 stream,
1384 stream->ref.refcount,
1385 stream->stream_handle,
1386 stream->trace->id,
1387 stream->trace->session->id);
1388 print_stream_indexes(stream);
1389 stream_put(stream);
1390 }
1391 rcu_read_unlock();
1392 }
This page took 0.096358 seconds and 5 git commands to generate.