8 from contextlib
import closing
11 def line_count(file_path
):
13 with
open(file_path
) as f
:
19 def sha256_checksum(filename
, block_size
=65536):
20 sha256
= hashlib
.sha256()
21 with
open(filename
, 'rb') as f
:
22 for block
in iter(lambda: f
.read(block_size
), b
''):
24 return sha256
.hexdigest()
27 # TODO: timeout as a parameter or Settings
28 # TODO: Custom exception
29 def wait_for_file(path
):
32 while not os
.path
.exists(path
):
36 raise Exception("File still does not exists. Timeout expired")
39 # TODO: find better exception
40 def create_empty_file(path
):
41 if os
.path
.exists(path
):
42 raise Exception("Path already exist")
43 open(path
, 'w').close()
46 def __dummy_sigusr1_handler():
50 def sessiond_spawn(runtime
):
51 agent_port
= find_free_port()
52 previous_handler
= signal
.signal(signal
.SIGUSR1
, __dummy_sigusr1_handler
)
53 sessiond
= runtime
.spawn_subprocess("lttng-sessiond -vvv -S --agent-tcp-port {}".format(agent_port
))
54 signal
.sigtimedwait({signal
.SIGUSR1
}, 60)
55 previous_handler
= signal
.signal(signal
.SIGUSR1
, previous_handler
)
60 # There is no guarantee that the port will be free at runtime but should be
62 with
closing(socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)) as s
:
64 return s
.getsockname()[1]
67 def file_contains(file_path
, list_of_string
):
68 with
open(file_path
, 'r') as f
:
70 for s
in list_of_string
:
75 def find_dir(root
, name
):
77 Returns the absolute path or None.
80 for base
, dirs
, files
in os
.walk(root
):
82 if tmp
.endswith(name
):
83 abs_path
= os
.path
.abspath(os
.path
.join(base
, tmp
))
87 def find_file(root
, name
):
89 Returns the absolute path or None.
92 for base
, dirs
, files
in os
.walk(root
):
94 if tmp
.endswith(name
):
95 abs_path
= os
.path
.abspath(os
.path
.join(base
, tmp
))
99 def validate(xml_path
, xsd_path
):
101 xmlschema_doc
= etree
.parse(xsd_path
)
102 xmlschema
= etree
.XMLSchema(xmlschema_doc
)
104 xml_doc
= etree
.parse(xml_path
)
105 result
= xmlschema
.validate(xml_doc
)
109 def xpath_query(xml_file
, xpath
):
111 Return a list of xml node corresponding to the xpath. The list can be of lenght
114 with
open(xml_file
, 'r') as f
:
115 tree
= etree
.parse(f
)
116 root
= tree
.getroot()
117 # Remove all namespace
118 # https://stackoverflow.com/questions/18159221/remove-namespace-and-prefix-from-xml-in-python-using-lxml
119 for elem
in root
.getiterator():
120 if not hasattr(elem
.tag
, 'find'):
122 i
= elem
.tag
.find('}')
124 elem
.tag
= elem
.tag
[i
+1:]
126 return root
.xpath(xpath
)
This page took 0.03377 seconds and 5 git commands to generate.