static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
+/*
+ * This hash table contains the mapping between the session id of the sessiond
+ * and the relayd session id. Element of the ht are indexed by sessiond session
+ * id.
+ *
+ * Node can be added when a relayd communication is opened in the sessiond
+ * thread.
+ *
+ * Note that a session id of the session daemon is unique to a tracing session
+ * and not to a domain session. However, a domain session has one consumer
+ * which forces the 1-1 mapping between a consumer and a domain session (ex:
+ * UST). This means that we can't have duplicate in this ht.
+ */
+static struct lttng_ht *relayd_session_id_ht;
+
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
rcu_read_unlock();
}
+/*
+ * Return a channel object for the given key.
+ *
+ * RCU read side lock MUST be acquired before calling this function and
+ * protects the channel ptr.
+ */
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_ht_iter iter;
return NULL;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
- rcu_read_unlock();
-
return channel;
}
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
+ /* Loockup for a relayd node in the session id map hash table. */
+ lttng_ht_lookup(relayd_session_id_ht,
+ (void *)((unsigned long) relayd->sessiond_session_id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ /* We assume the relayd is being or is destroyed */
+ return;
+ }
+
+ /*
+ * Try to delete it from the relayd session id ht. The return value is of
+ * no importance since either way we are going to try to delete the relayd
+ * from the global relayd_ht.
+ */
+ lttng_ht_del(relayd_session_id_ht, &iter);
+
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ destroy_relayd(relayd);
+ }
+
+ lttng_ht_destroy(consumer_data.relayd_ht);
+ /* The destroy_relayd call makes sure that this ht is empty here. */
+ lttng_ht_destroy(relayd_session_id_ht);
+
+ rcu_read_unlock();
+}
+
/*
* Update the end point status of all streams having the given network sequence
* index (relayd index).
goto free_stream;
}
- pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
end:
consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
goto end;
}
+ rcu_read_lock();
+
/*
* Get stream's channel reference. Needed when adding the stream to the
* global hash table.
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
+
+ rcu_read_unlock();
return stream;
error:
+ rcu_read_unlock();
free(stream);
end:
return NULL;
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
rcu_read_lock();
/* Steal stream identifier to avoid having streams with the same key */
consumer_data.need_update = 1;
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
int ret;
struct lttng_ht_iter iter;
+ DBG("Consumer delete channel key %d", channel->key);
+
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status) {
+ stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
continue;
}
DBG("Active FD %d", stream->wait_fd);
}
/*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
*/
void lttng_consumer_cleanup(void)
{
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
}
/*
* core function for writing trace buffers to either the local filesystem or
* the network.
*
+ * It must be called with the stream lock held.
+ *
* Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
/*
* Splice the data from the ring buffer to the tracefile.
*
+ * It must be called with the stream lock held.
+ *
* Returns the number of bytes spliced.
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
}
rcu_read_unlock();
*/
static void destroy_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
}
rcu_read_unlock();
goto free_stream;
}
+ pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
- pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
}
end:
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
assert(stream);
assert(ht);
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
/*
* From here, refcounts are updated so be _careful_ when returning an error
*/
rcu_read_lock();
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
uatomic_dec(&stream->chan->nb_init_streams);
}
- /* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
-
lttng_ht_add_unique_ulong(ht, &stream->node);
/*
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
}
/* Delete it right now */
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (!stream->endpoint_status) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
}
/*
rcu_register_thread();
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!metadata_ht) {
+ /* ENOMEM at this point. Better to bail out. */
+ goto error;
+ }
+
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
DBG("Metadata main loop started");
while (1) {
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
-
/* Only the metadata pipe is set */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
goto end;
}
restart:
- DBG("Metadata poll wait with %d fd(s)", nb_fd);
+ DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
ret = lttng_poll_wait(&events, -1);
DBG("Metadata event catched in thread");
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
revents = LTTNG_POLL_GETEV(&events, i);
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
- close(ctx->consumer_metadata_pipe[0]);
+ ret = close(ctx->consumer_metadata_pipe[0]);
+ if (ret < 0) {
+ PERROR("close metadata pipe");
+ }
continue;
} else if (revents & LPOLLIN) {
do {
DBG("Metadata poll thread exiting");
lttng_poll_clean(&events);
- if (metadata_ht) {
- destroy_stream_ht(metadata_ht);
- }
+ destroy_stream_ht(metadata_ht);
rcu_unregister_thread();
return NULL;
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (data_ht == NULL) {
+ /* ENOMEM at this point. Better to bail out. */
goto end;
}
pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+ if (pipe_readlen < 0) {
+ PERROR("read consumer data pipe");
+ /* Continue so we can at least handle the current stream(s). */
+ continue;
+ }
/*
* If the stream is NULL, just ignore it. It's also possible that
* only tracked fd in the poll set. The thread will take care of closing
* the read side.
*/
- close(ctx->consumer_metadata_pipe[1]);
-
- if (data_ht) {
- destroy_data_stream_ht(data_ht);
+ ret = close(ctx->consumer_metadata_pipe[1]);
+ if (ret < 0) {
+ PERROR("close data pipe");
}
+ destroy_data_stream_ht(data_ht);
+
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock, client_socket, ret;
+ int sock = -1, client_socket, ret;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
goto end;
}
+ /* This socket is not useful anymore. */
+ ret = close(client_socket);
+ if (ret < 0) {
+ PERROR("close client_socket");
+ }
+ client_socket = -1;
+
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ /* Cleaning up possibly open sockets. */
+ if (sock >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close sock sessiond poll");
+ }
+ }
+ if (client_socket >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close client_socket sessiond poll");
+ }
+ }
+
rcu_unregister_thread();
return NULL;
}
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
+ ssize_t ret;
+
+ pthread_mutex_lock(&stream->lock);
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ break;
default:
ERR("Unknown consumer_data type");
assert(0);
- return -ENOSYS;
+ ret = -ENOSYS;
+ break;
}
+
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
}
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(metadata_ht);
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(data_ht);
+ relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id)
{
- int fd, ret = -1;
+ int fd = -1, ret = -1, relayd_created = 0;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto error;
}
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
/* Poll on consumer socket. */
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
+ fd = -1; /* Just in case it gets set with an invalid value. */
+ goto error;
+ }
+
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
goto error;
}
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->control_sock.fd >= 0) {
+ if (close(relayd->control_sock.fd)) {
+ PERROR("close relayd control socket");
+ }
+ }
+ /* Handle create_sock error. */
if (ret < 0) {
goto error;
}
- /* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
-
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+
+ /*
+ * Create a session on the relayd and store the returned id. No need to
+ * grab the socket lock since the relayd object is not yet visible.
+ */
+ ret = relayd_create_session(&relayd->control_sock,
+ &relayd->relayd_session_id);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Set up a relayd session id node. */
+ relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
+ if (!relayd_id_node) {
+ PERROR("zmalloc relayd id node");
+ goto error;
+ }
+
+ relayd_id_node->relayd_id = relayd->relayd_session_id;
+ relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
+
+ /* Indexed by session id of the sessiond. */
+ lttng_ht_node_init_ulong(&relayd_id_node->node,
+ relayd_id_node->sessiond_id);
+ rcu_read_lock();
+ lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
+ rcu_read_unlock();
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->data_sock.fd >= 0) {
+ if (close(relayd->data_sock.fd)) {
+ PERROR("close relayd data socket");
+ }
+ }
+ /* Handle create_sock error. */
if (ret < 0) {
goto error;
}
- /* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
-
/* Assign new file descriptor */
relayd->data_sock.fd = fd;
break;
add_relayd(relayd);
/* All good! */
- ret = 0;
+ return 0;
error:
+ /* Close received socket if valid. */
+ if (fd >= 0) {
+ if (close(fd)) {
+ PERROR("close received socket");
+ }
+ }
+
+ if (relayd_created) {
+ /* We just want to cleanup. Ignore ret value. */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ free(relayd);
+ }
+
return ret;
}
return ret;
}
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_session_id *session_id_map;
+
+ /* Get the session id map. */
+ lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ goto end;
+ }
+
+ session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
+ node);
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ if (relayd->relayd_session_id == session_id_map->relayd_id) {
+ /* Found the relayd. There can be only one per id. */
+ break;
+ }
+ }
+
+end:
+ return relayd;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_pending;
+ goto data_pending;
}
/*
ret = data_pending(stream);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
/* Relayd check */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- /*
- * At this point, if the relayd object is not available for the
- * given stream, it is because the relayd is being cleaned up
- * so every stream associated with it (for a session id value)
- * are or will be marked for deletion hence no data pending.
- */
- pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
- }
-
+ if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
- ret = relayd_quiescent_control(&relayd->control_sock);
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
stream->relayd_stream_id, stream->next_net_seq_num);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
pthread_mutex_unlock(&stream->lock);
}
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto data_not_pending;
+ }
+ if (is_data_inflight) {
+ goto data_pending;
+ }
+ }
+
/*
- * Finding _no_ node in the hash table means that the stream(s) have been
- * removed thus data is guaranteed to be available for analysis from the
- * trace files. This is *only* true for local consumer and not network
- * streaming.
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
*/
+data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 0;
-data_not_pending:
+data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}