Introduce test_ust_python_agent_vs_tools.py
[deliverable/lttng-ivc.git] / lttng_ivc / utils / utils.py
index aedb1fdb3c0ae62a0dd68ee531d608fbfaff3fa2..c174b44e192b17d5b45fff99fe43638adf15bf8d 100644 (file)
@@ -1,4 +1,10 @@
 import signal
+import hashlib
+import os
+import time
+import socket
+
+from contextlib import closing
 
 def line_count(file_path):
     line_count = 0
@@ -8,13 +14,70 @@ def line_count(file_path):
     return line_count
 
 
+def sha256_checksum(filename, block_size=65536):
+    sha256 = hashlib.sha256()
+    with open(filename, 'rb') as f:
+        for block in iter(lambda: f.read(block_size), b''):
+            sha256.update(block)
+    return sha256.hexdigest()
+
+
+# TODO: timeout as a parameter or Settings
+# TODO: Custom exception
+def wait_for_file(path):
+    i = 0
+    timeout = 60
+    while not os.path.exists(path):
+        time.sleep(1)
+        i = i + 1
+        if i > timeout:
+            raise Exception("File still does not exists. Timeout expired")
+
+
+# TODO: find better exception
+def create_empty_file(path):
+    if os.path.exists(path):
+        raise Exception("Path already exist")
+    open(path, 'w').close()
+
+
 def __dummy_sigusr1_handler():
     pass
 
 
 def sessiond_spawn(runtime):
-        previous_handler = signal.signal(signal.SIGUSR1, __dummy_sigusr1_handler)
-        sessiond = runtime.spawn_subprocess("lttng-sessiond -vvv -S")
-        signal.sigtimedwait({signal.SIGUSR1}, 60)
-        previous_handler = signal.signal(signal.SIGUSR1, previous_handler)
-        return sessiond
+    agent_port = find_free_port()
+    previous_handler = signal.signal(signal.SIGUSR1, __dummy_sigusr1_handler)
+    sessiond = runtime.spawn_subprocess("lttng-sessiond -vvv -S --agent-tcp-port {}".format(agent_port))
+    signal.sigtimedwait({signal.SIGUSR1}, 60)
+    previous_handler = signal.signal(signal.SIGUSR1, previous_handler)
+    return sessiond
+
+
+def find_free_port():
+    # There is no guarantee that the port will be free at runtime but should be
+    # good enough
+    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
+        s.bind(('', 0))
+        return s.getsockname()[1]
+
+
+def file_contains(stderr_file, list_of_string):
+    with open(stderr_file, 'r') as stderr:
+        for line in stderr:
+            for s in list_of_string:
+                if s in line:
+                    return True
+
+
+def find_dir(root, name):
+    """
+    Returns the absolute path or None.
+    """
+    abs_path = None
+    for base, dirs, files in os.walk(root):
+        for tmp in dirs:
+            print(tmp)
+            if tmp.endswith(name):
+                abs_path = os.path.abspath(os.path.join(base, tmp))
+    return abs_path
This page took 0.024233 seconds and 5 git commands to generate.