| 1 | # -*- coding: utf-8 -*- |
| 2 | # |
| 3 | # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com> |
| 4 | # |
| 5 | # This library is free software; you can redistribute it and/or modify it under |
| 6 | # the terms of the GNU Lesser General Public License as published by the Free |
| 7 | # Software Foundation; version 2.1 of the License. |
| 8 | # |
| 9 | # This library is distributed in the hope that it will be useful, but WITHOUT |
| 10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 11 | # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
| 12 | # details. |
| 13 | # |
| 14 | # You should have received a copy of the GNU Lesser General Public License |
| 15 | # along with this library; if not, write to the Free Software Foundation, Inc., |
| 16 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 17 | |
| 18 | from __future__ import unicode_literals |
| 19 | |
| 20 | import ctypes |
| 21 | import errno |
| 22 | import logging |
| 23 | import os |
| 24 | import sys |
| 25 | import threading |
| 26 | import struct |
| 27 | import select |
| 28 | |
| 29 | from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP |
| 30 | from socket import * |
| 31 | from time import sleep |
| 32 | |
| 33 | __all__ = ["lttng-agent"] |
| 34 | __author__ = "David Goulet <dgoulet@efficios.com>" |
| 35 | |
| 36 | class LTTngAgent(): |
| 37 | """ |
| 38 | LTTng agent python code. A LTTng Agent is responsible to spawn two threads, |
| 39 | the current UID and root session daemon. Those two threads register to the |
| 40 | right daemon and handle the tracing. |
| 41 | |
| 42 | This class needs to be instantiate once and once the init returns, tracing |
| 43 | is ready to happen. |
| 44 | """ |
| 45 | |
| 46 | SESSIOND_ADDR = "127.0.0.1" |
| 47 | SEM_COUNT = 2 |
| 48 | # Timeout for the sempahore in seconds. |
| 49 | SEM_TIMEOUT = 5 |
| 50 | SEM_WAIT_PERIOD = 0.2 |
| 51 | |
| 52 | def __init__(self): |
| 53 | # Session daemon register semaphore. |
| 54 | self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT); |
| 55 | |
| 56 | self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) |
| 57 | self.client_user.start() |
| 58 | |
| 59 | self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) |
| 60 | self.client_root.log_handler.is_root = True |
| 61 | self.client_root.start() |
| 62 | |
| 63 | acquire = 0 |
| 64 | timeout = LTTngAgent.SEM_TIMEOUT |
| 65 | while True: |
| 66 | # Quit if timeout has reached 0 or below. |
| 67 | if acquire == LTTngAgent.SEM_COUNT or timeout <= 0: |
| 68 | break; |
| 69 | |
| 70 | # Acquire semaphore for *user* thread. |
| 71 | if not self.register_sem.acquire(False): |
| 72 | sleep(LTTngAgent.SEM_WAIT_PERIOD) |
| 73 | timeout -= LTTngAgent.SEM_WAIT_PERIOD |
| 74 | else: |
| 75 | acquire += 1 |
| 76 | |
| 77 | def __del__(self): |
| 78 | self.destroy() |
| 79 | |
| 80 | def destroy(self): |
| 81 | self.client_user.destroy() |
| 82 | self.client_user.join() |
| 83 | |
| 84 | self.client_root.destroy() |
| 85 | self.client_root.join() |
| 86 | |
| 87 | class LTTngCmdError(RuntimeError): |
| 88 | """ |
| 89 | Command error thrown if an error is encountered in a command from the |
| 90 | session daemon. |
| 91 | """ |
| 92 | |
| 93 | def __init__(self, code): |
| 94 | super().__init__('LTTng command error: code {}'.format(code)) |
| 95 | self._code = code |
| 96 | |
| 97 | def get_code(self): |
| 98 | return self._code |
| 99 | |
| 100 | class LTTngUnknownCmdError(RuntimeError): |
| 101 | pass |
| 102 | |
| 103 | class LTTngLoggingHandler(logging.Handler): |
| 104 | """ |
| 105 | Class handler for the Python logging API. |
| 106 | """ |
| 107 | |
| 108 | def __init__(self): |
| 109 | logging.Handler.__init__(self, level = logging.NOTSET) |
| 110 | |
| 111 | # Refcount tracking how many events have been enabled. This value above |
| 112 | # 0 means that the handler is attached to the root logger. |
| 113 | self.refcount = 0 |
| 114 | |
| 115 | # Dict of enabled event. We track them so we know if it's ok to disable |
| 116 | # the received event. |
| 117 | self.enabled_events = {} |
| 118 | |
| 119 | # Am I root ? |
| 120 | self.is_root = False |
| 121 | |
| 122 | # Using the logging formatter to extract the asctime only. |
| 123 | self.log_fmt = logging.Formatter("%(asctime)s") |
| 124 | self.setFormatter(self.log_fmt) |
| 125 | |
| 126 | # ctypes lib for lttng-ust |
| 127 | try: |
| 128 | self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so") |
| 129 | except OSError as e: |
| 130 | print("Unable to find libust for Python.") |
| 131 | |
| 132 | def emit(self, record): |
| 133 | """ |
| 134 | Fire LTTng UST tracepoint with the given record. |
| 135 | """ |
| 136 | asctime = self.format(record) |
| 137 | |
| 138 | self.lttng_ust.py_tracepoint(asctime.encode(), |
| 139 | record.getMessage().encode(), record.name.encode(), |
| 140 | record.funcName.encode(), record.lineno, record.levelno, |
| 141 | record.thread, record.threadName.encode()) |
| 142 | |
| 143 | def enable_event(self, name): |
| 144 | """ |
| 145 | Enable an event name which will ultimately add an handler to the root |
| 146 | logger if none is present. |
| 147 | """ |
| 148 | # Don't update the refcount if the event has been enabled prior. |
| 149 | if name in self.enabled_events: |
| 150 | return |
| 151 | |
| 152 | # Get the root logger and attach our handler. |
| 153 | root_logger = logging.getLogger() |
| 154 | # First thing first, we need to set the root logger to the loglevel |
| 155 | # NOTSET so we can catch everything. The default is 30. |
| 156 | root_logger.setLevel(logging.NOTSET) |
| 157 | |
| 158 | self.refcount += 1 |
| 159 | if self.refcount == 1: |
| 160 | root_logger.addHandler(self) |
| 161 | |
| 162 | self.enabled_events[name] = True |
| 163 | |
| 164 | def disable_event(self, name): |
| 165 | """ |
| 166 | Disable an event name which will ultimately add an handler to the root |
| 167 | logger if none is present. |
| 168 | """ |
| 169 | |
| 170 | if name not in self.enabled_events: |
| 171 | # Event was not enabled prior, do nothing. |
| 172 | return |
| 173 | |
| 174 | # Get the root logger and attach our handler. |
| 175 | root_logger = logging.getLogger() |
| 176 | |
| 177 | self.refcount -= 1 |
| 178 | if self.refcount == 0: |
| 179 | root_logger.removeHandler(self) |
| 180 | del self.enabled_events[name] |
| 181 | |
| 182 | def list_logger(self): |
| 183 | """ |
| 184 | Return a list of logger name. |
| 185 | """ |
| 186 | return logging.Logger.manager.loggerDict.keys() |
| 187 | |
| 188 | class LTTngSessiondCmd(): |
| 189 | """ |
| 190 | Class handling session daemon command. |
| 191 | """ |
| 192 | |
| 193 | # Command values from the agent protocol |
| 194 | CMD_LIST = 1 |
| 195 | CMD_ENABLE = 2 |
| 196 | CMD_DISABLE = 3 |
| 197 | CMD_REG_DONE = 4 |
| 198 | |
| 199 | # Return code |
| 200 | CODE_SUCCESS = 1 |
| 201 | CODE_INVALID_CMD = 2 |
| 202 | |
| 203 | # Python Logger LTTng domain value taken from lttng/domain.h |
| 204 | DOMAIN = 5 |
| 205 | |
| 206 | # Protocol version |
| 207 | MAJOR_VERSION = 1 |
| 208 | MINOR_VERSION = 0 |
| 209 | |
| 210 | def execute(self): |
| 211 | """ |
| 212 | This is part of the command interface. Must be implemented. |
| 213 | """ |
| 214 | raise NotImplementedError |
| 215 | |
| 216 | class LTTngCommandReply(): |
| 217 | """ |
| 218 | Object that contains the information that should be replied to the session |
| 219 | daemon after a command execution. |
| 220 | """ |
| 221 | |
| 222 | def __init__(self, payload = None, reply = True): |
| 223 | self.payload = payload |
| 224 | self.reply = reply |
| 225 | |
| 226 | class LTTngCommandEnable(LTTngSessiondCmd): |
| 227 | """ |
| 228 | Handle the enable event command from the session daemon. |
| 229 | """ |
| 230 | |
| 231 | def __init__(self, log_handler, data): |
| 232 | self.log_handler = log_handler |
| 233 | # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8. |
| 234 | name_offset = 8; |
| 235 | |
| 236 | data_size = len(data) |
| 237 | if data_size == 0: |
| 238 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) |
| 239 | |
| 240 | try: |
| 241 | self.loglevel, self.loglevel_type, self.name = \ |
| 242 | struct.unpack('>II%us' % (data_size - name_offset), data) |
| 243 | # Remove trailing NULL bytes from name. |
| 244 | self.name = self.name.decode().rstrip('\x00') |
| 245 | except struct.error: |
| 246 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) |
| 247 | |
| 248 | def execute(self): |
| 249 | self.log_handler.enable_event(self.name) |
| 250 | return LTTngCommandReply() |
| 251 | |
| 252 | class LTTngCommandDisable(LTTngSessiondCmd): |
| 253 | """ |
| 254 | Handle the disable event command from the session daemon. |
| 255 | """ |
| 256 | |
| 257 | def __init__(self, log_handler, data): |
| 258 | self.log_handler = log_handler |
| 259 | |
| 260 | data_size = len(data) |
| 261 | if data_size == 0: |
| 262 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) |
| 263 | |
| 264 | try: |
| 265 | self.name = struct.unpack('>%us' % (data_size), data)[0] |
| 266 | # Remove trailing NULL bytes from name. |
| 267 | self.name = self.name.decode().rstrip('\x00') |
| 268 | except struct.error: |
| 269 | raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) |
| 270 | |
| 271 | def execute(self): |
| 272 | self.log_handler.disable_event(self.name) |
| 273 | return LTTngCommandReply() |
| 274 | |
| 275 | class LTTngCommandRegDone(LTTngSessiondCmd): |
| 276 | """ |
| 277 | Handle register done command. This is sent back after a successful |
| 278 | registration from the session daemon. We basically release the given |
| 279 | semaphore so the agent can return to the caller. |
| 280 | """ |
| 281 | |
| 282 | def __init__(self, sem): |
| 283 | self.sem = sem |
| 284 | |
| 285 | def execute(self): |
| 286 | self.sem.release() |
| 287 | return LTTngCommandReply(reply = False) |
| 288 | |
| 289 | class LTTngCommandList(LTTngSessiondCmd): |
| 290 | """ |
| 291 | Handle the list command from the session daemon on the given socket. |
| 292 | """ |
| 293 | |
| 294 | def __init__(self, log_handler): |
| 295 | self.log_handler = log_handler |
| 296 | |
| 297 | def execute(self): |
| 298 | data_size = 0 |
| 299 | data = logger_data = bytearray() |
| 300 | |
| 301 | loggers = self.log_handler.list_logger() |
| 302 | # First, pack nb_event that must preceed the data. |
| 303 | logger_data += struct.pack('>I', len(loggers)) |
| 304 | |
| 305 | # Populate payload with logger name. |
| 306 | for logger in loggers: |
| 307 | # Increment data size plus the NULL byte at the end of the name. |
| 308 | data_size += len(logger) + 1 |
| 309 | # Pack logger name and NULL byte. |
| 310 | logger_data += struct.pack('>%usB' % (len(logger)), \ |
| 311 | bytes(bytearray(str.encode(logger))), 0) |
| 312 | |
| 313 | # Pack uint32_t data_size followed by nb event (number of logger) |
| 314 | data = struct.pack('>I', data_size) |
| 315 | data += logger_data |
| 316 | return LTTngCommandReply(payload = data) |
| 317 | |
| 318 | class LTTngTCPClient(threading.Thread): |
| 319 | """ |
| 320 | TCP client that register and receives command from the session daemon. |
| 321 | """ |
| 322 | |
| 323 | SYSTEM_PORT_FILE = "/var/run/lttng/agent.port" |
| 324 | USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port") |
| 325 | |
| 326 | # The time in seconds this client should wait before trying again to |
| 327 | # register back to the session daemon. |
| 328 | WAIT_TIME = 3 |
| 329 | |
| 330 | def __init__(self, host, sem): |
| 331 | threading.Thread.__init__(self) |
| 332 | |
| 333 | # Which host to connect to. The port is fetch dynamically. |
| 334 | self.sessiond_host = host |
| 335 | |
| 336 | # The session daemon register done semaphore. Needs to be released when |
| 337 | # receiving a CMD_REG_DONE command. |
| 338 | self.register_sem = sem |
| 339 | self.register_sem.acquire() |
| 340 | |
| 341 | # Indicate that we have to quit thus stop the main loop. |
| 342 | self.quit_flag = False |
| 343 | # Quit pipe. The thread poll on it to know when to quit. |
| 344 | self.quit_pipe = os.pipe() |
| 345 | |
| 346 | # Socket on which we communicate with the session daemon. |
| 347 | self.sessiond_sock = None |
| 348 | # LTTng Logging Handler |
| 349 | self.log_handler = LTTngLoggingHandler() |
| 350 | |
| 351 | def cleanup_socket(self, epfd = None): |
| 352 | # Ease our life a bit. |
| 353 | sock = self.sessiond_sock |
| 354 | if not sock: |
| 355 | return |
| 356 | |
| 357 | try: |
| 358 | if epfd is not None: |
| 359 | epfd.unregister(sock) |
| 360 | sock.shutdown(SHUT_RDWR) |
| 361 | sock.close() |
| 362 | except select.error: |
| 363 | # Cleanup fail, we can't do anything much... |
| 364 | pass |
| 365 | except IOError: |
| 366 | pass |
| 367 | |
| 368 | self.sessiond_sock = None |
| 369 | |
| 370 | def destroy(self): |
| 371 | self.quit_flag = True |
| 372 | try: |
| 373 | fp = os.fdopen(self.quit_pipe[1], 'w') |
| 374 | fp.write("42") |
| 375 | fp.close() |
| 376 | except OSError as e: |
| 377 | pass |
| 378 | |
| 379 | def register(self): |
| 380 | """ |
| 381 | Register to session daemon using the previously connected socket of the |
| 382 | class. |
| 383 | |
| 384 | Command ABI: |
| 385 | uint32 domain |
| 386 | uint32 pid |
| 387 | """ |
| 388 | data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \ |
| 389 | LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION) |
| 390 | self.sessiond_sock.send(data) |
| 391 | |
| 392 | def run(self): |
| 393 | """ |
| 394 | Start the TCP client thread by registering to the session daemon and polling |
| 395 | on that socket for commands. |
| 396 | """ |
| 397 | |
| 398 | epfd = epoll() |
| 399 | epfd.register(self.quit_pipe[0], EPOLLIN) |
| 400 | |
| 401 | # Main loop to handle session daemon command and disconnection. |
| 402 | while not self.quit_flag: |
| 403 | try: |
| 404 | # First, connect to the session daemon. |
| 405 | self.connect_sessiond() |
| 406 | |
| 407 | # Register to session daemon after a successful connection. |
| 408 | self.register() |
| 409 | # Add registered socket to poll set. |
| 410 | epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP) |
| 411 | |
| 412 | self.quit_flag = self.wait_cmd(epfd) |
| 413 | except IOError as e: |
| 414 | # Whatever happens here, we have to close down everything and |
| 415 | # retry to connect to the session daemon since either the |
| 416 | # socket is closed or invalid data was sent. |
| 417 | self.cleanup_socket(epfd) |
| 418 | self.register_sem.release() |
| 419 | sleep(LTTngTCPClient.WAIT_TIME) |
| 420 | continue |
| 421 | |
| 422 | self.cleanup_socket(epfd) |
| 423 | os.close(self.quit_pipe[0]) |
| 424 | epfd.close() |
| 425 | |
| 426 | def recv_header(self, sock): |
| 427 | """ |
| 428 | Receive the command header from the given socket. Set the internal |
| 429 | state of this object with the header data. |
| 430 | |
| 431 | Header ABI is defined like this: |
| 432 | uint64 data_size |
| 433 | uint32 cmd |
| 434 | uint32 cmd_version |
| 435 | """ |
| 436 | s_pack = struct.Struct('>QII') |
| 437 | |
| 438 | pack_data = sock.recv(s_pack.size) |
| 439 | data_received = len(pack_data) |
| 440 | if data_received == 0: |
| 441 | raise IOError(errno.ESHUTDOWN) |
| 442 | |
| 443 | try: |
| 444 | return s_pack.unpack(pack_data) |
| 445 | except struct.error: |
| 446 | raise IOError(errno.EINVAL) |
| 447 | |
| 448 | def create_command(self, cmd_type, version, data): |
| 449 | """ |
| 450 | Return the right command object using the given command type. The |
| 451 | command version is unused since we only have once for now. |
| 452 | """ |
| 453 | |
| 454 | cmd_dict = { |
| 455 | LTTngSessiondCmd.CMD_LIST: \ |
| 456 | lambda: LTTngCommandList(self.log_handler), |
| 457 | LTTngSessiondCmd.CMD_ENABLE: \ |
| 458 | lambda: LTTngCommandEnable(self.log_handler, data), |
| 459 | LTTngSessiondCmd.CMD_DISABLE: \ |
| 460 | lambda: LTTngCommandDisable(self.log_handler, data), |
| 461 | LTTngSessiondCmd.CMD_REG_DONE: \ |
| 462 | lambda: LTTngCommandRegDone(self.register_sem), |
| 463 | } |
| 464 | |
| 465 | if cmd_type in cmd_dict: |
| 466 | return cmd_dict[cmd_type]() |
| 467 | else: |
| 468 | raise LTTngUnknownCmdError() |
| 469 | |
| 470 | def pack_code(self, code): |
| 471 | return struct.pack('>I', code) |
| 472 | |
| 473 | def handle_command(self, data, cmd_type, cmd_version): |
| 474 | """ |
| 475 | Handle the given command type with the received payload. This function |
| 476 | sends back data to the session daemon using to the return value of the |
| 477 | command. |
| 478 | """ |
| 479 | payload = bytearray() |
| 480 | |
| 481 | try: |
| 482 | cmd = self.create_command(cmd_type, cmd_version, data) |
| 483 | cmd_reply = cmd.execute() |
| 484 | # Set success code in data |
| 485 | payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS) |
| 486 | if cmd_reply.payload is not None: |
| 487 | payload += cmd_reply.payload |
| 488 | except LTTngCmdError as e: |
| 489 | # Set error code in payload |
| 490 | payload += self.pack_code(e.get_code()) |
| 491 | except LTTngUnknownCmdError: |
| 492 | # Set error code in payload |
| 493 | payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD) |
| 494 | |
| 495 | # Send response only if asked for. |
| 496 | if cmd_reply.reply: |
| 497 | self.sessiond_sock.send(payload) |
| 498 | |
| 499 | def wait_cmd(self, epfd): |
| 500 | """ |
| 501 | """ |
| 502 | |
| 503 | while True: |
| 504 | try: |
| 505 | # Poll on socket for command. |
| 506 | events = epfd.poll() |
| 507 | except select.error as e: |
| 508 | raise IOError(e.errno, e.message) |
| 509 | |
| 510 | for fileno, event in events: |
| 511 | if fileno == self.quit_pipe[0]: |
| 512 | return True |
| 513 | elif event & (EPOLLERR | EPOLLHUP): |
| 514 | raise IOError(errno.ESHUTDOWN) |
| 515 | elif event & EPOLLIN: |
| 516 | data = bytearray() |
| 517 | |
| 518 | data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock) |
| 519 | if data_size: |
| 520 | data += self.sessiond_sock.recv(data_size) |
| 521 | |
| 522 | self.handle_command(data, cmd, cmd_version) |
| 523 | else: |
| 524 | raise IOError(errno.ESHUTDOWN) |
| 525 | |
| 526 | def get_port_from_file(self, path): |
| 527 | """ |
| 528 | Open the session daemon agent port file and returns the value. If none |
| 529 | found, 0 is returned. |
| 530 | """ |
| 531 | |
| 532 | # By default, the port is set to 0 so if we can not find the agent port |
| 533 | # file simply don't try to connect. A value set to 0 indicates that. |
| 534 | port = 0 |
| 535 | |
| 536 | try: |
| 537 | f = open(path, "r") |
| 538 | r_port = int(f.readline()) |
| 539 | if r_port > 0 or r_port <= 65535: |
| 540 | port = r_port |
| 541 | f.close() |
| 542 | except IOError as e: |
| 543 | pass |
| 544 | except ValueError as e: |
| 545 | pass |
| 546 | |
| 547 | return port |
| 548 | |
| 549 | def connect_sessiond(self): |
| 550 | """ |
| 551 | Connect sessiond_sock to running session daemon using the port file. |
| 552 | """ |
| 553 | # Create session daemon TCP socket |
| 554 | if not self.sessiond_sock: |
| 555 | self.sessiond_sock = socket(AF_INET, SOCK_STREAM) |
| 556 | |
| 557 | if self.log_handler.is_root: |
| 558 | port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE) |
| 559 | else: |
| 560 | port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE) |
| 561 | |
| 562 | # No session daemon available |
| 563 | if port == 0: |
| 564 | raise IOError(errno.ECONNREFUSED) |
| 565 | |
| 566 | # Can raise an IOError so caller must catch it. |
| 567 | self.sessiond_sock.connect((self.sessiond_host, port)) |