Network streaming support
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/consumer.h>
27 #include <common/defaults.h>
28
29 #include "consumer.h"
30 #include "ust-consumer.h"
31
32 /*
33 * Send all stream fds of UST channel to the consumer.
34 */
35 static int send_channel_streams(int sock,
36 struct ust_app_channel *uchan, const char *path,
37 uid_t uid, gid_t gid, struct consumer_output *consumer)
38 {
39 int ret, fd;
40 char tmp_path[PATH_MAX];
41 const char *pathname;
42 struct lttcomm_consumer_msg lum;
43 struct ltt_ust_stream *stream, *tmp;
44
45 DBG("Sending streams of channel %s to UST consumer", uchan->name);
46
47 consumer_init_channel_comm_msg(&lum,
48 LTTNG_CONSUMER_ADD_CHANNEL,
49 uchan->obj->shm_fd,
50 uchan->attr.subbuf_size,
51 uchan->obj->memory_map_size,
52 uchan->name);
53
54 ret = consumer_send_channel(sock, &lum);
55 if (ret < 0) {
56 goto error;
57 }
58
59 fd = uchan->obj->shm_fd;
60 ret = consumer_send_fds(sock, &fd, 1);
61 if (ret < 0) {
62 goto error;
63 }
64
65 /* Get the right path name destination */
66 if (consumer->type == CONSUMER_DST_LOCAL) {
67 /* Set application path to the destination path */
68 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
69 consumer->dst.trace_path, path);
70 if (ret < 0) {
71 PERROR("snprintf stream path");
72 goto error;
73 }
74 pathname = tmp_path;
75 DBG3("UST local consumer tracefile path: %s", pathname);
76 } else {
77 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
78 consumer->subdir, path);
79 if (ret < 0) {
80 PERROR("snprintf stream path");
81 goto error;
82 }
83 pathname = tmp_path;
84 DBG3("UST network consumer subdir path: %s", pathname);
85 }
86
87 cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) {
88 int fds[2];
89
90 if (!stream->obj->shm_fd) {
91 continue;
92 }
93
94 consumer_init_stream_comm_msg(&lum,
95 LTTNG_CONSUMER_ADD_STREAM,
96 uchan->obj->shm_fd,
97 stream->obj->shm_fd,
98 LTTNG_CONSUMER_ACTIVE_STREAM,
99 DEFAULT_UST_CHANNEL_OUTPUT,
100 stream->obj->memory_map_size,
101 uid,
102 gid,
103 consumer->net_seq_index,
104 0, /* Metadata flag unset */
105 stream->name,
106 pathname);
107
108 /* Send stream and file descriptor */
109 fds[0] = stream->obj->shm_fd;
110 fds[1] = stream->obj->wait_fd;
111 ret = consumer_send_stream(sock, consumer, &lum, fds, 2);
112 if (ret < 0) {
113 goto error;
114 }
115 }
116
117 DBG("UST consumer channel streams sent");
118
119 return 0;
120
121 error:
122 return ret;
123 }
124
125 /*
126 * Send all stream fds of the UST session to the consumer.
127 */
128 int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess,
129 struct consumer_output *consumer)
130 {
131 int ret = 0;
132 int sock = consumer_fd;
133 char tmp_path[PATH_MAX];
134 const char *pathname;
135 struct lttng_ht_iter iter;
136 struct lttcomm_consumer_msg lum;
137 struct ust_app_channel *ua_chan;
138
139 DBG("Sending metadata stream fd");
140
141 if (consumer_fd < 0) {
142 ERR("Consumer has negative file descriptor");
143 return -EINVAL;
144 }
145
146 if (usess->metadata->obj->shm_fd != 0) {
147 int fd;
148 int fds[2];
149
150 consumer_init_channel_comm_msg(&lum,
151 LTTNG_CONSUMER_ADD_CHANNEL,
152 usess->metadata->obj->shm_fd,
153 usess->metadata->attr.subbuf_size,
154 usess->metadata->obj->memory_map_size,
155 "metadata");
156
157 ret = consumer_send_channel(sock, &lum);
158 if (ret < 0) {
159 goto error;
160 }
161
162 fd = usess->metadata->obj->shm_fd;
163 ret = consumer_send_fds(sock, &fd, 1);
164 if (ret < 0) {
165 goto error;
166 }
167
168 /* Get correct path name destination */
169 if (consumer->type == CONSUMER_DST_LOCAL) {
170 /* Set application path to the destination path */
171 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
172 consumer->dst.trace_path, usess->path);
173 if (ret < 0) {
174 PERROR("snprintf stream path");
175 goto error;
176 }
177 pathname = tmp_path;
178
179 /* Create directory */
180 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
181 usess->uid, usess->gid);
182 if (ret < 0) {
183 if (ret != -EEXIST) {
184 ERR("Trace directory creation error");
185 goto error;
186 }
187 }
188 } else {
189 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
190 consumer->subdir, usess->path);
191 if (ret < 0) {
192 PERROR("snprintf metadata path");
193 goto error;
194 }
195 pathname = tmp_path;
196 }
197
198 consumer_init_stream_comm_msg(&lum,
199 LTTNG_CONSUMER_ADD_STREAM,
200 usess->metadata->obj->shm_fd,
201 usess->metadata->stream_obj->shm_fd,
202 LTTNG_CONSUMER_ACTIVE_STREAM,
203 DEFAULT_UST_CHANNEL_OUTPUT,
204 usess->metadata->stream_obj->memory_map_size,
205 usess->uid,
206 usess->gid,
207 consumer->net_seq_index,
208 1, /* Flag metadata set */
209 "metadata",
210 pathname);
211
212 /* Send stream and file descriptor */
213 fds[0] = usess->metadata->stream_obj->shm_fd;
214 fds[1] = usess->metadata->stream_obj->wait_fd;
215 ret = consumer_send_stream(sock, consumer, &lum, fds, 2);
216 if (ret < 0) {
217 goto error;
218 }
219 }
220
221 /* Send each channel fd streams of session */
222 rcu_read_lock();
223 cds_lfht_for_each_entry(usess->channels->ht, &iter.iter, ua_chan,
224 node.node) {
225 /*
226 * Indicate that the channel was not created on the tracer side so skip
227 * sending unexisting streams.
228 */
229 if (ua_chan->obj == NULL) {
230 continue;
231 }
232
233 ret = send_channel_streams(sock, ua_chan, usess->path, usess->uid,
234 usess->gid, consumer);
235 if (ret < 0) {
236 rcu_read_unlock();
237 goto error;
238 }
239 }
240 rcu_read_unlock();
241
242 DBG("consumer fds (metadata and channel streams) sent");
243
244 return 0;
245
246 error:
247 return ret;
248 }
249
250 /*
251 * Send relayd socket to consumer associated with a session name.
252 *
253 * On success return positive value. On error, negative value.
254 */
255 int ust_consumer_send_relayd_socket(int consumer_sock,
256 struct lttcomm_sock *sock, struct consumer_output *consumer,
257 enum lttng_stream_type type)
258 {
259 int ret;
260 struct lttcomm_consumer_msg msg;
261
262 /* Code flow error. Safety net. */
263 assert(sock);
264
265 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
266 msg.u.relayd_sock.net_index = consumer->net_seq_index;
267 msg.u.relayd_sock.type = type;
268 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
269
270 DBG2("Sending relayd sock info to consumer");
271 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
272 if (ret < 0) {
273 PERROR("send consumer relayd socket info");
274 goto error;
275 }
276
277 DBG2("Sending relayd socket file descriptor to consumer");
278 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
279 if (ret < 0) {
280 goto error;
281 }
282
283 DBG("UST consumer relayd socket sent");
284
285 error:
286 return ret;
287 }
This page took 0.035574 seconds and 5 git commands to generate.