extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
-extern volatile int consumer_quit;
/*
* Take a snapshot for a specific fd
}
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
- /* Are we at a position _before_ the first available packet ? */
- bool before_first_packet = true;
unsigned long consumed_pos, produced_pos;
health_code_update();
DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
path, stream->name, stream->key);
}
- if (relayd_id != -1ULL) {
- ret = consumer_send_relayd_streams_sent(relayd_id);
+
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
+ if (ret < 0) {
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
- ERR("sending streams sent to relayd");
+ ERR("Failed to flush kernel stream");
goto end_unlock;
}
- }
-
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
goto end_unlock;
}
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
- int lost_packet = 0;
health_code_update();
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
-
- /*
- * Start accounting lost packets only when we
- * already have extracted packets (to match the
- * content of the final snapshot).
- */
- if (!before_first_packet) {
- lost_packet = 1;
- }
+ stream->chan->lost_packets++;
continue;
}
goto end_unlock;
}
consumed_pos += stream->max_sb_size;
-
- /*
- * Only account lost packets located between
- * succesfully extracted packets (do not account before
- * and after since they are not visible in the
- * resulting snapshot).
- */
- stream->chan->lost_packets += lost_packet;
- lost_packet = 0;
- before_first_packet = false;
}
if (relayd_id == (uint64_t) -1ULL) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
/* Session daemon status message are handled in the following call. */
- ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id);
+ msg.u.relayd_sock.relayd_session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
consumer_stream_free(new_stream);
goto end_nosignal;
}
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
}
/* Get the right pipe where the stream will be sent. */
if (ret < 0) {
goto end_nosignal;
}
+ channel->streams_sent_to_relayd = true;
}
break;
}
}
case LTTNG_CONSUMER_DISCARDED_EVENTS:
{
- uint64_t ret;
+ ssize_t ret;
+ uint64_t count;
struct lttng_consumer_channel *channel;
uint64_t id = msg.u.discarded_events.session_id;
uint64_t key = msg.u.discarded_events.channel_key;
if (!channel) {
ERR("Kernel consumer discarded events channel %"
PRIu64 " not found", key);
- ret = 0;
+ count = 0;
} else {
- ret = channel->discarded_events;
+ count = channel->discarded_events;
}
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
if (ret < 0) {
PERROR("send discarded events");
goto error_fatal;
}
case LTTNG_CONSUMER_LOST_PACKETS:
{
- uint64_t ret;
+ ssize_t ret;
+ uint64_t count;
struct lttng_consumer_channel *channel;
uint64_t id = msg.u.lost_packets.session_id;
uint64_t key = msg.u.lost_packets.channel_key;
if (!channel) {
ERR("Kernel consumer lost packets channel %"
PRIu64 " not found", key);
- ret = 0;
+ count = 0;
} else {
- ret = channel->lost_packets;
+ count = channel->lost_packets;
}
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
if (ret < 0) {
PERROR("send lost packets");
goto error_fatal;
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
index->stream_instance_id = -1ULL;
- ret = 0;
} else {
PERROR("kernctl_get_instance_id");
goto error;
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
seq = -1ULL;
- ret = 0;
} else {
PERROR("kernctl_get_sequence_number");
goto end;