Refactor Python agent build and install
[deliverable/lttng-ust.git] / liblttng-ust-python-agent / lttngust / agent.py
diff --git a/liblttng-ust-python-agent/lttngust/agent.py b/liblttng-ust-python-agent/lttngust/agent.py
deleted file mode 100644 (file)
index ebfa2de..0000000
+++ /dev/null
@@ -1,395 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
-# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
-#
-# This library is free software; you can redistribute it and/or modify it under
-# the terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; version 2.1 of the License.
-#
-# This library is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this library; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
-
-from __future__ import unicode_literals
-from __future__ import print_function
-from __future__ import division
-import lttngust.debug as dbg
-import lttngust.loghandler
-import lttngust.cmd
-from io import open
-import threading
-import logging
-import socket
-import time
-import sys
-import os
-
-
-try:
-    # Python 2
-    import Queue as queue
-except ImportError:
-    # Python 3
-    import queue
-
-
-_PROTO_DOMAIN = 5
-_PROTO_MAJOR = 2
-_PROTO_MINOR = 0
-
-
-def _get_env_value_ms(key, default_s):
-    try:
-        val = int(os.getenv(key, default_s * 1000)) / 1000
-    except:
-        val = -1
-
-    if val < 0:
-        fmt = 'invalid ${} value; {} seconds will be used'
-        dbg._pwarning(fmt.format(key, default_s))
-        val = default_s
-
-    return val
-
-
-_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
-_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
-
-
-class _TcpClient(object):
-    def __init__(self, name, host, port, reg_queue):
-        super(self.__class__, self).__init__()
-        self._name = name
-        self._host = host
-        self._port = port
-
-        try:
-            self._log_handler = lttngust.loghandler._Handler()
-        except (OSError) as e:
-            dbg._pwarning('cannot load library: {}'.format(e))
-            raise e
-
-        self._root_logger = logging.getLogger()
-        self._root_logger.setLevel(logging.NOTSET)
-        self._ref_count = 0
-        self._sessiond_sock = None
-        self._reg_queue = reg_queue
-        self._server_cmd_handlers = {
-            lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
-            lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
-            lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
-            lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
-        }
-
-    def _debug(self, msg):
-        return 'client "{}": {}'.format(self._name, msg)
-
-    def run(self):
-        while True:
-            try:
-                # connect to the session daemon
-                dbg._pdebug(self._debug('connecting to session daemon'))
-                self._connect_to_sessiond()
-
-                # register to the session daemon after a successful connection
-                dbg._pdebug(self._debug('registering to session daemon'))
-                self._register()
-
-                # wait for commands from the session daemon
-                self._wait_server_cmd()
-            except (Exception) as e:
-                # Whatever happens here, we have to close the socket and
-                # retry to connect to the session daemon since either
-                # the socket was closed, a network timeout occured, or
-                # invalid data was received.
-                dbg._pdebug(self._debug('got exception: {}'.format(e)))
-                self._cleanup_socket()
-                dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
-                time.sleep(_RETRY_REG_DELAY)
-
-    def _recv_server_cmd_header(self):
-        data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
-
-        if not data:
-            dbg._pdebug(self._debug('received empty server command header'))
-            return None
-
-        assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
-        dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
-
-        return lttngust.cmd._server_cmd_header_from_data(data)
-
-    def _recv_server_cmd(self):
-        server_cmd_header = self._recv_server_cmd_header()
-
-        if server_cmd_header is None:
-            return None
-
-        dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
-        dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
-        dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
-        data = bytes()
-
-        if server_cmd_header.data_size > 0:
-            data = self._sessiond_sock.recv(server_cmd_header.data_size)
-            assert(len(data) == server_cmd_header.data_size)
-
-        return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
-
-    def _send_cmd_reply(self, cmd_reply):
-        data = cmd_reply.get_data()
-        dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
-        self._sessiond_sock.sendall(data)
-
-    def _handle_server_cmd_reg_done(self, server_cmd):
-        dbg._pdebug(self._debug('got "registration done" server command'))
-
-        if self._reg_queue is not None:
-            dbg._pdebug(self._debug('notifying _init_threads()'))
-
-            try:
-                self._reg_queue.put(True)
-            except (Exception) as e:
-                # read side could be closed by now; ignore it
-                pass
-
-            self._reg_queue = None
-
-    def _handle_server_cmd_enable(self, server_cmd):
-        dbg._pdebug(self._debug('got "enable" server command'))
-        self._ref_count += 1
-
-        if self._ref_count == 1:
-            dbg._pdebug(self._debug('adding our handler to the root logger'))
-            self._root_logger.addHandler(self._log_handler)
-
-        dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
-
-        return lttngust.cmd._ClientCmdReplyEnable()
-
-    def _handle_server_cmd_disable(self, server_cmd):
-        dbg._pdebug(self._debug('got "disable" server command'))
-        self._ref_count -= 1
-
-        if self._ref_count < 0:
-            # disable command could be sent again when a session is destroyed
-            self._ref_count = 0
-
-        if self._ref_count == 0:
-            dbg._pdebug(self._debug('removing our handler from the root logger'))
-            self._root_logger.removeHandler(self._log_handler)
-
-        dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
-
-        return lttngust.cmd._ClientCmdReplyDisable()
-
-    def _handle_server_cmd_list(self, server_cmd):
-        dbg._pdebug(self._debug('got "list" server command'))
-        names = logging.Logger.manager.loggerDict.keys()
-        dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
-        cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
-
-        return cmd_reply
-
-    def _handle_server_cmd(self, server_cmd):
-        cmd_reply = None
-
-        if server_cmd is None:
-            dbg._pdebug(self._debug('bad server command'))
-            status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
-            cmd_reply = lttngust.cmd._ClientCmdReply(status)
-        elif type(server_cmd) in self._server_cmd_handlers:
-            cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
-        else:
-            dbg._pdebug(self._debug('unknown server command'))
-            status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
-            cmd_reply = lttngust.cmd._ClientCmdReply(status)
-
-        if cmd_reply is not None:
-            self._send_cmd_reply(cmd_reply)
-
-    def _wait_server_cmd(self):
-        while True:
-            try:
-                server_cmd = self._recv_server_cmd()
-            except socket.timeout:
-                # simply retry here; the protocol has no KA and we could
-                # wait for hours
-                continue
-
-            self._handle_server_cmd(server_cmd)
-
-    def _cleanup_socket(self):
-        try:
-            self._sessiond_sock.shutdown(socket.SHUT_RDWR)
-            self._sessiond_sock.close()
-        except:
-            pass
-
-        self._sessiond_sock = None
-
-    def _connect_to_sessiond(self):
-        # create session daemon TCP socket
-        if self._sessiond_sock is None:
-            self._sessiond_sock = socket.socket(socket.AF_INET,
-                                                socket.SOCK_STREAM)
-
-        # Use str(self._host) here. Since this host could be a string
-        # literal, and since we're importing __future__.unicode_literals,
-        # we want to make sure the host is a native string in Python 2.
-        # This avoids an indirect module import (unicode module to
-        # decode the unicode string, eventually imported by the
-        # socket module if needed), which is not allowed in a thread
-        # directly created by a module in Python 2 (our case).
-        #
-        # tl;dr: Do NOT remove str() here, or this call in Python 2
-        # _will_ block on an interpreter's mutex until the waiting
-        # register queue timeouts.
-        self._sessiond_sock.connect((str(self._host), self._port))
-
-    def _register(self):
-        cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
-                                              _PROTO_MAJOR, _PROTO_MINOR)
-        data = cmd.get_data()
-        self._sessiond_sock.sendall(data)
-
-
-def _get_port_from_file(path):
-    port = None
-    dbg._pdebug('reading port from file "{}"'.format(path))
-
-    try:
-        f = open(path)
-        r_port = int(f.readline())
-        f.close()
-
-        if r_port > 0 or r_port <= 65535:
-            port = r_port
-    except:
-        pass
-
-    return port
-
-
-def _get_user_home_path():
-    # $LTTNG_HOME overrides $HOME if it exists
-    return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
-
-
-_initialized = False
-_SESSIOND_HOST = '127.0.0.1'
-
-
-def _client_thread_target(name, port, reg_queue):
-    dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
-    client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
-    dbg._pdebug('starting client "{}"'.format(name))
-    client.run()
-
-
-def _init_threads():
-    global _initialized
-
-    dbg._pdebug('entering')
-
-    if _initialized:
-        dbg._pdebug('agent is already initialized')
-        return
-
-    # This makes sure that the appropriate modules for encoding and
-    # decoding strings/bytes are imported now, since no import should
-    # happen within a thread at import time (our case).
-    'lttng'.encode().decode()
-
-    _initialized = True
-    sys_port = _get_port_from_file('/var/run/lttng/agent.port')
-    user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
-    user_port = _get_port_from_file(user_port_file)
-    reg_queue = queue.Queue()
-    reg_expecting = 0
-
-    dbg._pdebug('system session daemon port: {}'.format(sys_port))
-    dbg._pdebug('user session daemon port: {}'.format(user_port))
-
-    if sys_port == user_port and sys_port is not None:
-        # The two session daemon ports are the same. This is not normal.
-        # Connect to only one.
-        dbg._pdebug('both user and system session daemon have the same port')
-        sys_port = None
-
-    try:
-        if sys_port is not None:
-            dbg._pdebug('creating system client thread')
-            t = threading.Thread(target=_client_thread_target,
-                                 args=('system', sys_port, reg_queue))
-            t.name = 'system'
-            t.daemon = True
-            t.start()
-            dbg._pdebug('created and started system client thread')
-            reg_expecting += 1
-
-        if user_port is not None:
-            dbg._pdebug('creating user client thread')
-            t = threading.Thread(target=_client_thread_target,
-                                 args=('user', user_port, reg_queue))
-            t.name = 'user'
-            t.daemon = True
-            t.start()
-            dbg._pdebug('created and started user client thread')
-            reg_expecting += 1
-    except:
-        # cannot create threads for some reason; stop this initialization
-        dbg._pwarning('cannot create client threads')
-        return
-
-    if reg_expecting == 0:
-        # early exit: looks like there's not even one valid port
-        dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
-        return
-
-    cur_timeout = _REG_TIMEOUT
-
-    # We block here to make sure the agent is properly registered to
-    # the session daemon. If we timeout, the client threads will still
-    # continue to try to connect and register to the session daemon,
-    # but there is no guarantee that all following logging statements
-    # will make it to LTTng-UST.
-    #
-    # When a client thread receives a "registration done" confirmation
-    # from the session daemon it's connected to, it puts True in
-    # reg_queue.
-    while True:
-        try:
-            dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
-                                                                                               cur_timeout))
-            t1 = time.clock()
-            reg_queue.get(timeout=cur_timeout)
-            t2 = time.clock()
-            reg_expecting -= 1
-            dbg._pdebug('unblocked')
-
-            if reg_expecting == 0:
-                # done!
-                dbg._pdebug('successfully registered to session daemon(s)')
-                break
-
-            cur_timeout -= (t2 - t1)
-
-            if cur_timeout <= 0:
-                # timeout
-                dbg._pdebug('ran out of time')
-                break
-        except queue.Empty:
-            dbg._pdebug('ran out of time')
-            break
-
-    dbg._pdebug('leaving')
-
-
-_init_threads()
This page took 0.029609 seconds and 5 git commands to generate.