(*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
(*pollfd)[i + 1].events = POLLIN | POLLPRI;
+
+ (*pollfd)[i + 2].fd = lttng_pipe_get_readfd(ctx->consumer_data_rotate_pipe);
+ (*pollfd)[i + 2].events = POLLIN | POLLPRI;
return i;
}
goto error_wakeup_pipe;
}
+ ctx->consumer_data_rotate_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_rotate_pipe) {
+ goto error_data_rotate_pipe;
+ }
+
+ ctx->consumer_metadata_rotate_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_metadata_rotate_pipe) {
+ goto error_metadata_rotate_pipe;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
error_channel_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
+error_metadata_rotate_pipe:
+ lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+error_data_rotate_pipe:
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
error_wakeup_pipe:
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_rotate_pipe);
+ lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe);
utils_close_pipe(ctx->consumer_should_quit);
unlink(ctx->consumer_command_sock_path);
rcu_read_unlock();
}
+static
+int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
+ struct lttng_pipe *stream_pipe)
+{
+ int ret;
+ ssize_t pipe_len;
+ struct lttng_consumer_stream *stream;
+
+ pipe_len = lttng_pipe_read(stream_pipe, &stream, sizeof(stream));
+ if (pipe_len < sizeof(stream)) {
+ if (pipe_len < 0) {
+ PERROR("read metadata stream");
+ }
+ ERR("Failed to read stream on metadata rotate pipe");
+ ret = -1;
+ goto end;
+ }
+
+ fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
- ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
if (ret < 0) {
ERR("Poll set creation failed");
goto end_poll;
goto end;
}
+ ret = lttng_poll_add(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe), LPOLLIN);
+ if (ret < 0) {
+ goto end;
+ }
+
/* Main loop */
DBG("Metadata main loop started");
/* Handle other stream */
continue;
+ } else if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)) {
+ if (revents & LPOLLIN) {
+ ret = handle_rotate_wakeup_pipe(ctx,
+ ctx->consumer_metadata_rotate_pipe);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ lttng_poll_del(&events,
+ lttng_pipe_get_readfd(
+ ctx->consumer_metadata_rotate_pipe));
+ lttng_pipe_read_close(
+ ctx->consumer_metadata_rotate_pipe);
+ goto end;
+ }
+ } else if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Metadata rotate pipe hung up");
+ fprintf(stderr, "Metadata rotate pipe hung up");
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe));
+ lttng_pipe_read_close(ctx->consumer_metadata_rotate_pipe);
+ continue;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto end;
+ }
+ continue;
}
rcu_read_lock();
local_stream = NULL;
/*
- * Allocate for all fds +1 for the consumer_data_pipe and +1 for
- * wake up pipe.
+ * Allocate for all fds and:
+ * +1 for the consumer_data_pipe
+ * +1 for wake up pipe
+ * +1 for consumer_data_rotate_pipe.
*/
- pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+ pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 2) *
+ local_stream = zmalloc((consumer_data.stream_count + 3) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
ctx->has_wakeup = 0;
}
+ /* Handle consumer_data_rotate_pipe. */
+ if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) {
+ fprintf(stderr, "data wakeup pipe\n");
+ ret = handle_rotate_wakeup_pipe(ctx,
+ ctx->consumer_data_rotate_pipe);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata stream");
+ goto end;
+ }
+ }
+
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
health_code_update();
* the read side.
*/
(void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
+ (void) lttng_pipe_write_close(ctx->consumer_metadata_rotate_pipe);
error_testpoint:
if (err) {
{
int ret;
+ fprintf(stderr, "Notif send\n");
do {
ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
} while (ret == -1 && errno == EINTR);
DBG("Sent channel rotation notification for channel key %"
PRIu64, key);
}
+ fprintf(stderr, "Notif done\n");
return ret;
}
abort();
}
+ fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
if (--stream->chan->nr_stream_rotate_pending == 0) {
ret = rotate_notify_sessiond(ctx, stream->chan->key);
}
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
+ fprintf(stderr, "rotated: %d\n", stream->rotated);
pthread_mutex_unlock(&stream->lock);
rotate_ret = consumer_post_rotation(stream, ctx);
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
- DBG("Kernel consumer sample rotate position for channel %" PRIu64, key);
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
rcu_read_lock();
memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
- ERR("Taking kernel snapshot positions");
+ ERR("Taking snapshot positions");
goto end_unlock;
}
ret = lttng_consumer_get_produced_snapshot(stream,
&stream->rotate_position);
if (ret < 0) {
- ERR("Produced kernel snapshot position");
+ ERR("Produced snapshot position");
goto end_unlock;
}
fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
fprintf(stderr, "Stream %lu ready to rotate to %s\n",
stream->key, channel->pathname);
}
+ fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending);
channel->nr_stream_rotate_pending++;
ret = consumer_flush_buffer(stream, 1);
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
+ struct lttng_pipe *stream_pipe;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
ret = -1;
goto end;
}
+
+ if (channel->metadata_stream) {
+ fprintf(stderr, "M\n");
+ stream_pipe = ctx->consumer_metadata_rotate_pipe;
+ } else {
+ fprintf(stderr, "D\n");
+ stream_pipe = ctx->consumer_data_rotate_pipe;
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct, &channel->key, &iter.iter,
stream, node_channel_id.node) {
health_code_update();
- /*
- * Lock stream because we are about to change its state.
- */
- pthread_mutex_lock(&stream->lock);
if (stream->rotate_ready == 0) {
- pthread_mutex_unlock(&stream->lock);
continue;
}
- ret = lttng_consumer_rotate_stream(ctx, stream);
- if (ret < 0) {
- pthread_mutex_unlock(&stream->lock);
- ERR("Stream rotation error");
- goto end;
- }
-
- pthread_mutex_unlock(&stream->lock);
- ret = consumer_post_rotation(stream, ctx);
+ fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key);
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
- ERR("Failed after a rotation");
+ ERR("Failed to wakeup consumer rotate pipe");
goto end;
}
+ fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key);
}
ret = 0;