return ret;
}
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(channel->relayd_id);
+
+ if (relayd == NULL) {
+ ERR("relayd ID %" PRIu64 " unknown. Can't send streams.",
+ channel->relayd_id);
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /*
+ * Perform the send part of the relayd_add_stream for all stream.
+ *
+ * This ensure that we do not wait for response in between each command
+ * before sending the next one.
+ *
+ * This result in a waterfall of send command and a waterfall of recv.
+ *
+ * This leverage the TCP order guarantee for send and receive.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+ ret = relayd_add_stream_send(&relayd->control_sock, stream->name,
+ stream->chan->pathname,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
+ if (ret < 0) {
+ ERR("Relayd add stream send failed. Cleaning up relayd %" PRIu64".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_socket_mutex;
+ }
+ }
+
+ /* Perform individual recv part of relayd_add_stream */
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+
+ ret = relayd_add_stream_rcv(&relayd->control_sock, &stream->relayd_stream_id);
+ if (ret < 0) {
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_socket_mutex;
+ }
+
+ uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
+
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->relayd_id);
+ }
+
+end_socket_mutex:
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end_rcu_unlock:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and send the streams sent message
*
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel);
#endif /* LIB_CONSUMER_H */
return ret;
}
+/*
+ * Add stream on the relayd. Send part.
+ *
+ * On success return 0 else return ret_code negative value.
+ */
+int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name,
+ const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count)
+{
+ int ret;
+ struct lttcomm_relayd_add_stream msg;
+ struct lttcomm_relayd_add_stream_2_2 msg_2_2;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+ assert(channel_name);
+ assert(pathname);
+
+ DBG("Relayd adding stream for channel name %s. Part send", channel_name);
+
+ /* Compat with relayd 2.1 */
+ if (rsock->minor == 1) {
+ memset(&msg, 0, sizeof(msg));
+ if (lttng_strncpy(msg.channel_name, channel_name,
+ sizeof(msg.channel_name))) {
+ ret = -1;
+ goto error;
+ }
+ if (lttng_strncpy(msg.pathname, pathname,
+ sizeof(msg.pathname))) {
+ ret = -1;
+ goto error;
+ }
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+ } else {
+ memset(&msg_2_2, 0, sizeof(msg_2_2));
+ /* Compat with relayd 2.2+ */
+ if (lttng_strncpy(msg_2_2.channel_name, channel_name,
+ sizeof(msg_2_2.channel_name))) {
+ ret = -1;
+ goto error;
+ }
+ if (lttng_strncpy(msg_2_2.pathname, pathname,
+ sizeof(msg_2_2.pathname))) {
+ ret = -1;
+ goto error;
+ }
+ msg_2_2.tracefile_size = htobe64(tracefile_size);
+ msg_2_2.tracefile_count = htobe64(tracefile_count);
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0);
+ if (ret < 0) {
+ goto error;
+ }
+ }
+
+ DBG("Relayd add stream sent for channel name %s.", channel_name);
+ ret = 0;
+
+error:
+ return ret;
+}
+
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id)
+{
+ int ret;
+ struct lttcomm_relayd_status_stream reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ /* Waiting for reply */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Back to host bytes order. */
+ reply.handle = be64toh(reply.handle);
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd add stream replied error %d", reply.ret_code);
+ } else {
+ /* Success */
+ ret = 0;
+ *_stream_id = reply.handle;
+ }
+
+ DBG("Relayd stream added successfully with handle %" PRIu64,
+ reply.handle);
+
+error:
+ return ret;
+}
+
/*
* Inform the relay that all the streams for the current channel has been sent.
*
int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
uint64_t stream_id, uint64_t version);
+int relayd_add_stream_send(struct lttcomm_relayd_sock *sock, const char *channel_name,
+ const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count);
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id);
+
#endif /* _RELAYD_H */
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_consumer_stream *stream;
- uint64_t relayd_id = -1ULL;
assert(channel);
assert(ctx);
DBG("UST consumer sending channel %s to sessiond", channel->name);
if (channel->relayd_id != (uint64_t) -1ULL) {
- cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
- health_code_update();
-
- /* Try to send the stream to the relayd if one is available. */
- ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
- if (ret < 0) {
- /*
- * Flag that the relayd was the problem here probably due to a
- * communicaton error on the socket.
- */
- if (relayd_error) {
- *relayd_error = 1;
- }
- ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
- }
- if (relayd_id == -1ULL) {
- relayd_id = stream->relayd_id;
+ ret = consumer_send_relayd_channel_bulk(channel);
+ if (ret < 0) {
+ /*
+ * Flag that the relayd was the problem here probably due to a
+ * communicaton error on the socket.
+ */
+ if (relayd_error) {
+ *relayd_error = 1;
}
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
}