{
int ret;
- fprintf(stderr, "Notif send\n");
do {
ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
} while (ret == -1 && errno == EINTR);
DBG("Sent channel rotation notification for channel key %"
PRIu64, key);
}
- fprintf(stderr, "Notif done\n");
return ret;
}
abort();
}
- fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
if (--stream->chan->nr_stream_rotate_pending == 0) {
ret = rotate_notify_sessiond(ctx, stream->chan->key);
}
}
pthread_mutex_lock(&stream->lock);
- fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
ret = lttng_consumer_rotate_stream(ctx, stream);
pthread_mutex_unlock(&stream->lock);
if (ret < 0) {
}
} else if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Metadata rotate pipe hung up");
- fprintf(stderr, "Metadata rotate pipe hung up");
/*
* Remove the pipe from the poll set and continue the loop
* since their might be data to consume.
/* Handle consumer_data_rotate_pipe. */
if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) {
- fprintf(stderr, "data wakeup pipe\n");
ret = handle_rotate_wakeup_pipe(ctx,
ctx->consumer_data_rotate_pipe);
if (ret < 0) {
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
- fprintf(stderr, "rotated: %d\n", stream->rotated);
pthread_mutex_unlock(&stream->lock);
rotate_ret = consumer_post_rotation(stream, ctx);
ERR("Produced snapshot position");
goto end_unlock;
}
- fprintf(stderr, "Stream %lu should rotate after %lu to %s\n",
- stream->key, stream->rotate_position,
- channel->pathname);
lttng_consumer_get_consumed_snapshot(stream,
&consumed_pos);
- fprintf(stderr, "consumed %lu\n", consumed_pos);
if (consumed_pos == stream->rotate_position) {
stream->rotate_ready = 1;
- fprintf(stderr, "Stream %lu ready to rotate to %s\n",
- stream->key, channel->pathname);
}
- fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending);
channel->nr_stream_rotate_pending++;
ret = consumer_flush_buffer(stream, 1);
}
if (stream->rotate_ready) {
- fprintf(stderr, "Rotate position reached for stream %lu\n",
- stream->key);
ret = 1;
goto end;
}
goto end;
}
- fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
/* Rotate position not reached yet. */
if ((long) (consumed_pos - stream->rotate_position) < 0) {
ret = 0;
goto end;
}
- fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
- consumed_pos, stream->rotate_position,
- stream->key);
ret = 1;
end:
goto error;
}
- fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key,
- stream->channel_ro_pathname, stream->name);
ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name,
stream->channel_ro_tracefile_size, stream->tracefile_count_current,
stream->uid, stream->gid, NULL);
}
if (channel->metadata_stream) {
- fprintf(stderr, "M\n");
stream_pipe = ctx->consumer_metadata_rotate_pipe;
} else {
- fprintf(stderr, "D\n");
stream_pipe = ctx->consumer_data_rotate_pipe;
}
if (stream->rotate_ready == 0) {
continue;
}
- fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key);
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
ERR("Failed to wakeup consumer rotate pipe");
goto end;
}
- fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key);
}
ret = 0;