Fix: do not repurpose iterator while it is being used
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <signal.h>
21
22 #include <common/pipe.h>
23 #include <common/utils.h>
24
25 #include "manage-consumer.h"
26 #include "testpoint.h"
27 #include "health-sessiond.h"
28 #include "utils.h"
29 #include "thread.h"
30 #include "ust-consumer.h"
31
32 struct thread_notifiers {
33 struct lttng_pipe *quit_pipe;
34 struct consumer_data *consumer_data;
35 sem_t ready;
36 int initialization_result;
37 };
38
39 static void mark_thread_as_ready(struct thread_notifiers *notifiers)
40 {
41 DBG("Marking consumer management thread as ready");
42 notifiers->initialization_result = 0;
43 sem_post(&notifiers->ready);
44 }
45
46 static void mark_thread_intialization_as_failed(
47 struct thread_notifiers *notifiers)
48 {
49 ERR("Consumer management thread entering error state");
50 notifiers->initialization_result = -1;
51 sem_post(&notifiers->ready);
52 }
53
54 static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
55 {
56 DBG("Waiting for consumer management thread to be ready");
57 sem_wait(&notifiers->ready);
58 DBG("Consumer management thread is ready");
59 }
60
61 /*
62 * This thread manage the consumer error sent back to the session daemon.
63 */
64 void *thread_consumer_management(void *data)
65 {
66 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
67 uint32_t revents, nb_fd;
68 enum lttcomm_return_code code;
69 struct lttng_poll_event events;
70 struct thread_notifiers *notifiers = data;
71 struct consumer_data *consumer_data = notifiers->consumer_data;
72 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
73 struct consumer_socket *cmd_socket_wrapper = NULL;
74
75 DBG("[thread] Manage consumer started");
76
77 rcu_register_thread();
78 rcu_thread_online();
79
80 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
81
82 health_code_update();
83
84 /*
85 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
86 * metadata_sock. Nothing more will be added to this poll set.
87 */
88 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
89 if (ret < 0) {
90 mark_thread_intialization_as_failed(notifiers);
91 goto error_poll;
92 }
93
94 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
95 if (ret < 0) {
96 mark_thread_intialization_as_failed(notifiers);
97 goto error;
98 }
99
100 /*
101 * The error socket here is already in a listening state which was done
102 * just before spawning this thread to avoid a race between the consumer
103 * daemon exec trying to connect and the listen() call.
104 */
105 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
106 if (ret < 0) {
107 mark_thread_intialization_as_failed(notifiers);
108 goto error;
109 }
110
111 health_code_update();
112
113 /* Infinite blocking call, waiting for transmission */
114 health_poll_entry();
115
116 if (testpoint(sessiond_thread_manage_consumer)) {
117 mark_thread_intialization_as_failed(notifiers);
118 goto error;
119 }
120
121 ret = lttng_poll_wait(&events, -1);
122 health_poll_exit();
123 if (ret < 0) {
124 mark_thread_intialization_as_failed(notifiers);
125 goto error;
126 }
127
128 nb_fd = ret;
129
130 for (i = 0; i < nb_fd; i++) {
131 /* Fetch once the poll data */
132 revents = LTTNG_POLL_GETEV(&events, i);
133 pollfd = LTTNG_POLL_GETFD(&events, i);
134
135 health_code_update();
136
137 if (!revents) {
138 /* No activity for this FD (poll implementation). */
139 continue;
140 }
141
142 /* Thread quit pipe has been closed. Killing thread. */
143 if (pollfd == quit_pipe_read_fd) {
144 err = 0;
145 mark_thread_intialization_as_failed(notifiers);
146 goto exit;
147 } else if (pollfd == consumer_data->err_sock) {
148 /* Event on the registration socket */
149 if (revents & LPOLLIN) {
150 continue;
151 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
152 ERR("consumer err socket poll error");
153 mark_thread_intialization_as_failed(notifiers);
154 goto error;
155 } else {
156 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
157 mark_thread_intialization_as_failed(notifiers);
158 goto error;
159 }
160 }
161 }
162
163 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
164 if (sock < 0) {
165 mark_thread_intialization_as_failed(notifiers);
166 goto error;
167 }
168
169 /*
170 * Set the CLOEXEC flag. Return code is useless because either way, the
171 * show must go on.
172 */
173 (void) utils_set_fd_cloexec(sock);
174
175 health_code_update();
176
177 DBG2("Receiving code from consumer err_sock");
178
179 /* Getting status code from kconsumerd */
180 ret = lttcomm_recv_unix_sock(sock, &code,
181 sizeof(enum lttcomm_return_code));
182 if (ret <= 0) {
183 mark_thread_intialization_as_failed(notifiers);
184 goto error;
185 }
186
187 health_code_update();
188 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
189 ERR("consumer error when waiting for SOCK_READY : %s",
190 lttcomm_get_readable_code(-code));
191 mark_thread_intialization_as_failed(notifiers);
192 goto error;
193 }
194
195 /* Connect both command and metadata sockets. */
196 consumer_data->cmd_sock =
197 lttcomm_connect_unix_sock(
198 consumer_data->cmd_unix_sock_path);
199 consumer_data->metadata_fd =
200 lttcomm_connect_unix_sock(
201 consumer_data->cmd_unix_sock_path);
202 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
203 PERROR("consumer connect cmd socket");
204 mark_thread_intialization_as_failed(notifiers);
205 goto error;
206 }
207
208 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
209
210 /* Create metadata socket lock. */
211 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
212 if (consumer_data->metadata_sock.lock == NULL) {
213 PERROR("zmalloc pthread mutex");
214 mark_thread_intialization_as_failed(notifiers);
215 goto error;
216 }
217 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
218
219 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
220 DBG("Consumer metadata socket ready (fd: %d)",
221 consumer_data->metadata_fd);
222
223 /*
224 * Remove the consumerd error sock since we've established a connection.
225 */
226 ret = lttng_poll_del(&events, consumer_data->err_sock);
227 if (ret < 0) {
228 mark_thread_intialization_as_failed(notifiers);
229 goto error;
230 }
231
232 /* Add new accepted error socket. */
233 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
234 if (ret < 0) {
235 mark_thread_intialization_as_failed(notifiers);
236 goto error;
237 }
238
239 /* Add metadata socket that is successfully connected. */
240 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
241 LPOLLIN | LPOLLRDHUP);
242 if (ret < 0) {
243 mark_thread_intialization_as_failed(notifiers);
244 goto error;
245 }
246
247 health_code_update();
248
249 /*
250 * Transfer the write-end of the channel monitoring and rotate pipe
251 * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
252 */
253 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
254 if (!cmd_socket_wrapper) {
255 mark_thread_intialization_as_failed(notifiers);
256 goto error;
257 }
258 cmd_socket_wrapper->lock = &consumer_data->lock;
259
260 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
261 consumer_data->channel_monitor_pipe);
262 if (ret) {
263 mark_thread_intialization_as_failed(notifiers);
264 goto error;
265 }
266
267 /* Discard the socket wrapper as it is no longer needed. */
268 consumer_destroy_socket(cmd_socket_wrapper);
269 cmd_socket_wrapper = NULL;
270
271 /* The thread is completely initialized, signal that it is ready. */
272 mark_thread_as_ready(notifiers);
273
274 /* Infinite blocking call, waiting for transmission */
275 while (1) {
276 health_code_update();
277
278 /* Exit the thread because the thread quit pipe has been triggered. */
279 if (should_quit) {
280 /* Not a health error. */
281 err = 0;
282 goto exit;
283 }
284
285 health_poll_entry();
286 ret = lttng_poll_wait(&events, -1);
287 health_poll_exit();
288 if (ret < 0) {
289 goto error;
290 }
291
292 nb_fd = ret;
293
294 for (i = 0; i < nb_fd; i++) {
295 /* Fetch once the poll data */
296 revents = LTTNG_POLL_GETEV(&events, i);
297 pollfd = LTTNG_POLL_GETFD(&events, i);
298
299 health_code_update();
300
301 if (!revents) {
302 /* No activity for this FD (poll implementation). */
303 continue;
304 }
305
306 /*
307 * Thread quit pipe has been triggered, flag that we should stop
308 * but continue the current loop to handle potential data from
309 * consumer.
310 */
311 if (pollfd == quit_pipe_read_fd) {
312 should_quit = 1;
313 } else if (pollfd == sock) {
314 /* Event on the consumerd socket */
315 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
316 && !(revents & LPOLLIN)) {
317 ERR("consumer err socket second poll error");
318 goto error;
319 }
320 health_code_update();
321 /* Wait for any kconsumerd error */
322 ret = lttcomm_recv_unix_sock(sock, &code,
323 sizeof(enum lttcomm_return_code));
324 if (ret <= 0) {
325 ERR("consumer closed the command socket");
326 goto error;
327 }
328
329 ERR("consumer return code : %s",
330 lttcomm_get_readable_code(-code));
331
332 goto exit;
333 } else if (pollfd == consumer_data->metadata_fd) {
334 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
335 && !(revents & LPOLLIN)) {
336 ERR("consumer err metadata socket second poll error");
337 goto error;
338 }
339 /* UST metadata requests */
340 ret = ust_consumer_metadata_request(
341 &consumer_data->metadata_sock);
342 if (ret < 0) {
343 ERR("Handling metadata request");
344 goto error;
345 }
346 }
347 /* No need for an else branch all FDs are tested prior. */
348 }
349 health_code_update();
350 }
351
352 exit:
353 error:
354 /*
355 * We lock here because we are about to close the sockets and some other
356 * thread might be using them so get exclusive access which will abort all
357 * other consumer command by other threads.
358 */
359 pthread_mutex_lock(&consumer_data->lock);
360
361 /* Immediately set the consumerd state to stopped */
362 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
363 uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
364 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
365 consumer_data->type == LTTNG_CONSUMER32_UST) {
366 uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
367 } else {
368 /* Code flow error... */
369 assert(0);
370 }
371
372 if (consumer_data->err_sock >= 0) {
373 ret = close(consumer_data->err_sock);
374 if (ret) {
375 PERROR("close");
376 }
377 consumer_data->err_sock = -1;
378 }
379 if (consumer_data->cmd_sock >= 0) {
380 ret = close(consumer_data->cmd_sock);
381 if (ret) {
382 PERROR("close");
383 }
384 consumer_data->cmd_sock = -1;
385 }
386 if (consumer_data->metadata_sock.fd_ptr &&
387 *consumer_data->metadata_sock.fd_ptr >= 0) {
388 ret = close(*consumer_data->metadata_sock.fd_ptr);
389 if (ret) {
390 PERROR("close");
391 }
392 }
393 if (sock >= 0) {
394 ret = close(sock);
395 if (ret) {
396 PERROR("close");
397 }
398 }
399
400 unlink(consumer_data->err_unix_sock_path);
401 unlink(consumer_data->cmd_unix_sock_path);
402 pthread_mutex_unlock(&consumer_data->lock);
403
404 /* Cleanup metadata socket mutex. */
405 if (consumer_data->metadata_sock.lock) {
406 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
407 free(consumer_data->metadata_sock.lock);
408 }
409 lttng_poll_clean(&events);
410
411 if (cmd_socket_wrapper) {
412 consumer_destroy_socket(cmd_socket_wrapper);
413 }
414 error_poll:
415 if (err) {
416 health_error();
417 ERR("Health error occurred in %s", __func__);
418 }
419 health_unregister(health_sessiond);
420 DBG("consumer thread cleanup completed");
421
422 rcu_thread_offline();
423 rcu_unregister_thread();
424
425 return NULL;
426 }
427
428 static bool shutdown_consumer_management_thread(void *data)
429 {
430 struct thread_notifiers *notifiers = data;
431 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
432
433 return notify_thread_pipe(write_fd) == 1;
434 }
435
436 static void cleanup_consumer_management_thread(void *data)
437 {
438 struct thread_notifiers *notifiers = data;
439
440 lttng_pipe_destroy(notifiers->quit_pipe);
441 free(notifiers);
442 }
443
444 bool launch_consumer_management_thread(struct consumer_data *consumer_data)
445 {
446 struct lttng_pipe *quit_pipe;
447 struct thread_notifiers *notifiers = NULL;
448 struct lttng_thread *thread;
449
450 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
451 if (!quit_pipe) {
452 goto error;
453 }
454
455 notifiers = zmalloc(sizeof(*notifiers));
456 if (!notifiers) {
457 goto error;
458 }
459 notifiers->quit_pipe = quit_pipe;
460 notifiers->consumer_data = consumer_data;
461 sem_init(&notifiers->ready, 0, 0);
462
463 thread = lttng_thread_create("Consumer management",
464 thread_consumer_management,
465 shutdown_consumer_management_thread,
466 cleanup_consumer_management_thread,
467 notifiers);
468 if (!thread) {
469 goto error;
470 }
471 wait_until_thread_is_ready(notifiers);
472 lttng_thread_put(thread);
473 if (notifiers->initialization_result) {
474 goto error;
475 }
476 return true;
477 error:
478 cleanup_consumer_management_thread(notifiers);
479 return false;
480 }
This page took 0.041038 seconds and 5 git commands to generate.