stream->next_net_seq_num - 1);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
+ ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
}
/* Both conditions are met, we destroy the relayd. */
stream->index_file = NULL;
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
+
/* Check and cleanup relayd if needed. */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
}
/* Free stream within a RCU call. */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
consumer_stream_free(stream);
}
struct ctf_packet_index *element)
{
int ret;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
assert(element);
rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_index(&relayd->control_sock, element,
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ struct consumer_relayd_sock_pair *relayd;
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /*
+ * Communication error with lttng-relayd,
+ * perform cleanup now
+ */
+ ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ ret = -1;
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
+ stream->key, stream->net_seq_idx);
+ ret = -1;
+ }
} else {
if (lttng_index_file_write(stream->index_file, element)) {
ret = -1;
rcu_read_unlock();
return ret;
}
+
+int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
+ bool create_index)
+{
+ int ret;
+ enum lttng_trace_chunk_status chunk_status;
+ const int flags = O_WRONLY | O_CREAT | O_TRUNC;
+ const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+ char stream_path[LTTNG_PATH_MAX];
+
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->trace_chunk);
+
+ ret = utils_stream_file_path(stream->chan->pathname, stream->name,
+ stream->chan->tracefile_size,
+ stream->chan->tracefile_count, NULL,
+ stream_path, sizeof(stream_path));
+ if (ret < 0) {
+ goto end;
+ }
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret < 0) {
+ PERROR("Failed to close stream file \"%s\"",
+ stream->name);
+ goto end;
+ }
+ stream->out_fd = -1;
+ }
+
+ DBG("Opening stream output file \"%s\"", stream_path);
+ chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
+ flags, mode, &stream->out_fd);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to open stream file \"%s\"", stream->name);
+ ret = -1;
+ goto end;
+ }
+
+ if (!stream->metadata_flag && (create_index || stream->index_file)) {
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ }
+ stream->index_file = lttng_index_file_create_from_trace_chunk(
+ stream->trace_chunk,
+ stream->chan->pathname,
+ stream->name,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
+ false);
+ if (!stream->index_file) {
+ ret = -1;
+ goto end;
+ }
+ }
+
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+end:
+ return ret;
+}
+
+int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ stream->tracefile_count_current++;
+ if (stream->chan->tracefile_count > 0) {
+ stream->tracefile_count_current %=
+ stream->chan->tracefile_count;
+ }
+
+ DBG("Rotating output files of stream \"%s\"", stream->name);
+ ret = consumer_stream_create_output_files(stream, true);
+ if (ret) {
+ goto end;
+ }
+
+end:
+ return ret;
+}