int sock, struct pollfd *consumer_sockpoll)
{
ssize_t ret;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
return ret;
}
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+ /*
+ * Notify the session daemon that the command is completed.
+ *
+ * On transport layer error, the function call will print an error
+ * message so handling the returned code is a bit useless since we
+ * return an error code anyway.
+ */
+ (void) consumer_send_status_msg(sock, ret_code);
return -ENOENT;
}
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ /* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
DBG("UST Consumer adding channel");
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
return ret;
}
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
DBG("consumer_add_channel %d", msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
DBG("UST Consumer adding stream");
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
rcu_read_unlock();
return ret;
}
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
DBG("Consumer command ADD_STREAM chan %d stream %d",
msg.u.stream.channel_key, msg.u.stream.stream_key);
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
ERR("Unable to find relayd %" PRIu64, index);
- goto end_nosignal;
+ ret_code = LTTNG_ERR_NO_CONSUMER;
}
/*
*
* The destroy can happen either here or when a stream fd hangs up.
*/
- consumer_flag_relayd_for_destroy(relayd);
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
goto end_nosignal;
}
rcu_read_unlock();
return -ENOSYS;
}
- case LTTNG_CONSUMER_DATA_AVAILABLE:
+ case LTTNG_CONSUMER_DATA_PENDING:
{
int32_t ret;
- uint64_t id = msg.u.data_available.session_id;
+ uint64_t id = msg.u.data_pending.session_id;
- DBG("UST consumer data available command for id %" PRIu64, id);
+ DBG("UST consumer data pending command for id %" PRIu64, id);
- ret = consumer_data_available(id);
+ ret = consumer_data_pending(id);
/* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
- PERROR("send data available ret code");
+ PERROR("send data pending ret code");
}
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */
break;
}
default:
struct lttng_ust_shm_handle *handle;
struct lttng_ust_lib_ring_buffer *buf;
char dummy;
- ssize_t readlen;
DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
stream->wait_fd, stream->key);
/* We can consume the 1 byte written into the wait_fd by UST */
if (!stream->hangup_flush_done) {
+ ssize_t readlen;
+
do {
readlen = read(stream->wait_fd, &dummy, 1);
} while (readlen == -1 && errno == EINTR);
/*
* Check if data is still being extracted from the buffers for a specific
- * stream. Consumer data lock MUST be acquired before calling this function.
+ * stream. Consumer data lock MUST be acquired before calling this function
+ * and the stream lock.
*
- * Return 0 if the traced data are still getting read else 1 meaning that the
+ * Return 1 if the traced data are still getting read else 0 meaning that the
* data is available for trace viewer reading.
*/
-int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
{
int ret;
assert(stream);
- DBG("UST consumer checking data availability");
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret == EBUSY) {
- /* Data not available */
- ret = 0;
- goto end;
- }
- /* The stream is now locked so we can do our ustctl calls */
+ DBG("UST consumer checking data pending");
ret = ustctl_get_next_subbuf(stream->chan->handle, stream->buf);
if (ret == 0) {
/* There is still data so let's put back this subbuffer. */
ret = ustctl_put_subbuf(stream->chan->handle, stream->buf);
assert(ret == 0);
- goto end_unlock;
+ ret = 1; /* Data is pending */
+ goto end;
}
- /* Data is available to be read for this stream. */
- ret = 1;
+ /* Data is NOT pending so ready to be read. */
+ ret = 0;
-end_unlock:
- pthread_mutex_unlock(&stream->lock);
end:
return ret;
}