X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=ff5360aae755aa082ec6517cb6c4f10df99c9d9d;hp=344c6627dbbe507c01ba483c7ba46c8f2577b57c;hb=ffe600149a7608221985751e1bf293234bf2545c;hpb=ccf7af6c78ba7a206baa9d0b9578468a1af734e1 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 344c6627d..ff5360aae 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -60,13 +60,55 @@ int consumer_recv_status_reply(struct consumer_socket *sock) ret = 0; } else { ret = -reply.ret_code; - DBG("Consumer ret code %d", reply.ret_code); + DBG("Consumer ret code %d", ret); } end: return ret; } +/* + * Once the ASK_CHANNEL command is sent to the consumer, the channel + * information are sent back. This call receives that data and populates key + * and stream_count. + * + * On success return 0 and both key and stream_count are set. On error, a + * negative value is sent back and both parameters are untouched. + */ +int consumer_recv_status_channel(struct consumer_socket *sock, + unsigned long *key, unsigned int *stream_count) +{ + int ret; + struct lttcomm_consumer_status_channel reply; + + assert(sock); + assert(stream_count); + assert(key); + + ret = lttcomm_recv_unix_sock(sock->fd, &reply, sizeof(reply)); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Don't return 0 which means success. */ + ret = -1; + } + /* The above call will print a PERROR on error. */ + DBG("Fail to receive status reply on sock %d", sock->fd); + goto end; + } + + /* An error is possible so don't touch the key and stream_count. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + goto end; + } + + *key = reply.key; + *stream_count = reply.stream_count; + +end: + return ret; +} + /* * Send destroy relayd command to consumer. * @@ -81,7 +123,7 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, assert(consumer); assert(sock); - DBG2("Sending destroy relayd command to consumer..."); + DBG2("Sending destroy relayd command to consumer sock %d", sock->fd); /* Bail out if consumer is disabled */ if (!consumer->enabled) { @@ -511,6 +553,32 @@ error: return ret; } +/* + * Consumer send communication message structure to consumer. + */ +int consumer_send_msg(struct consumer_socket *sock, + struct lttcomm_consumer_msg *msg) +{ + int ret; + + assert(msg); + assert(sock); + assert(sock->fd >= 0); + + ret = lttcomm_send_unix_sock(sock->fd, msg, + sizeof(struct lttcomm_consumer_msg)); + if (ret < 0) { + /* The above call will print a PERROR on error. */ + DBG("Error when sending consumer channel on sock %d", sock->fd); + goto error; + } + + ret = consumer_recv_status_reply(sock); + +error: + return ret; +} + /* * Consumer send channel communication message structure to consumer. */ @@ -537,30 +605,94 @@ error: return ret; } +/* + * Populate the given consumer msg structure with the ask_channel command + * information. + */ +void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, + uint64_t subbuf_size, + uint64_t num_subbuf, + int overwrite, + unsigned int switch_timer_interval, + unsigned int read_timer_interval, + int output, + int type, + uint64_t session_id, + const char *pathname, + const char *name, + uid_t uid, + gid_t gid, + int relayd_id, + unsigned long key, + unsigned char *uuid) +{ + assert(msg); + + /* Zeroed structure */ + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; + msg->u.ask_channel.subbuf_size = subbuf_size; + msg->u.ask_channel.num_subbuf = num_subbuf ; + msg->u.ask_channel.overwrite = overwrite; + msg->u.ask_channel.switch_timer_interval = switch_timer_interval; + msg->u.ask_channel.read_timer_interval = read_timer_interval; + msg->u.ask_channel.output = output; + msg->u.ask_channel.type = type; + msg->u.ask_channel.session_id = session_id; + msg->u.ask_channel.uid = uid; + msg->u.ask_channel.gid = gid; + msg->u.ask_channel.relayd_id = relayd_id; + msg->u.ask_channel.key = key; + + memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); + + strncpy(msg->u.ask_channel.pathname, pathname, + sizeof(msg->u.ask_channel.pathname)); + msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0'; + + strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); + msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; +} + /* * Init channel communication message structure. */ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, - uint64_t max_sb_size, - uint64_t mmap_len, + uint64_t session_id, + const char *pathname, + uid_t uid, + gid_t gid, + int relayd_id, const char *name, - unsigned int nb_init_streams) + unsigned int nb_init_streams, + enum lttng_event_output output, + int type) { assert(msg); - /* TODO: Args validation */ - /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); /* Send channel */ msg->cmd_type = cmd; msg->u.channel.channel_key = channel_key; - msg->u.channel.max_sb_size = max_sb_size; - msg->u.channel.mmap_len = mmap_len; + msg->u.channel.session_id = session_id; + msg->u.channel.uid = uid; + msg->u.channel.gid = gid; + msg->u.channel.relayd_id = relayd_id; msg->u.channel.nb_init_streams = nb_init_streams; + msg->u.channel.output = output; + msg->u.channel.type = type; + + strncpy(msg->u.channel.pathname, pathname, + sizeof(msg->u.channel.pathname)); + msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0'; + + strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name)); + msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0'; } /* @@ -570,39 +702,16 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, int stream_key, - uint32_t state, - enum lttng_event_output output, - uint64_t mmap_len, - uid_t uid, - gid_t gid, - int net_index, - unsigned int metadata_flag, - const char *name, - const char *pathname, - unsigned int session_id) + int cpu) { assert(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - /* TODO: Args validation */ - msg->cmd_type = cmd; msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; - msg->u.stream.state = state; - msg->u.stream.output = output; - msg->u.stream.mmap_len = mmap_len; - msg->u.stream.uid = uid; - msg->u.stream.gid = gid; - msg->u.stream.net_index = net_index; - msg->u.stream.metadata_flag = metadata_flag; - msg->u.stream.session_id = (uint64_t) session_id; - strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); - msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; - strncpy(msg->u.stream.path_name, pathname, - sizeof(msg->u.stream.path_name)); - msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; + msg->u.stream.cpu = cpu; } /* @@ -617,29 +726,7 @@ int consumer_send_stream(struct consumer_socket *sock, assert(msg); assert(dst); assert(sock); - - switch (dst->type) { - case CONSUMER_DST_NET: - /* Consumer should send the stream on the network. */ - msg->u.stream.net_index = dst->net_seq_index; - break; - case CONSUMER_DST_LOCAL: - /* Add stream file name to stream path */ - strncat(msg->u.stream.path_name, "/", - sizeof(msg->u.stream.path_name) - - strlen(msg->u.stream.path_name) - 1); - strncat(msg->u.stream.path_name, msg->u.stream.name, - sizeof(msg->u.stream.path_name) - - strlen(msg->u.stream.path_name) - 1); - msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; - /* Indicate that the stream is NOT network */ - msg->u.stream.net_index = -1; - break; - default: - ERR("Consumer unknown output type (%d)", dst->type); - ret = -1; - goto error; - } + assert(fds); /* Send on socket */ ret = lttcomm_send_unix_sock(sock->fd, msg,