-#!/usr/bin/env python3
-
import sys
import os
import shutil
class TraceTest():
def __init__(self, delete_trace=True):
self.delete_trace = delete_trace
- self.trace_root = tempfile.mkdtemp()
+ self._trace_root = tempfile.mkdtemp()
self.trace_path = os.path.join(self.trace_root, "kernel")
self.create_writer()
self.create_stream_class()
if self.delete_trace:
self.rm_trace()
- def get_trace_root(self):
- return self.trace_root
+ @property
+ def trace_root(self):
+ return self._trace_root
def rm_trace(self):
shutil.rmtree(self.trace_root)
else:
string = "%s" % (string + "\0" * (16 - len(string)))
- for i in range(len(string)):
- a = event.field(i)
- a.value = ord(string[i])
+ for i, char in enumerate(string):
+ event.field(i).value = ord(char)
def set_int(self, event, value):
event.value = value
self.stream.flush()
def write_syscall_write(self, time_ms, cpu_id, delay, fd, buf, count, ret):
- entry = CTFWriter.Event(self.syscall_entry_write)
+ event_entry = CTFWriter.Event(self.syscall_entry_write)
self.clock.time = time_ms * 1000000
- self.set_int(entry.payload("_cpu_id"), cpu_id)
- self.set_int(entry.payload("_fd"), fd)
- self.set_int(entry.payload("_buf"), buf)
- self.set_int(entry.payload("_count"), count)
- self.stream.append_event(entry)
+ self.set_int(event_entry.payload("_cpu_id"), cpu_id)
+ self.set_int(event_entry.payload("_fd"), fd)
+ self.set_int(event_entry.payload("_buf"), buf)
+ self.set_int(event_entry.payload("_count"), count)
+ self.stream.append_event(event_entry)
- exit = CTFWriter.Event(self.syscall_exit_write)
+ event_exit = CTFWriter.Event(self.syscall_exit_write)
self.clock.time = (time_ms + delay) * 1000000
- self.set_int(exit.payload("_cpu_id"), cpu_id)
- self.set_int(exit.payload("_ret"), ret)
- self.stream.append_event(exit)
+ self.set_int(event_exit.payload("_cpu_id"), cpu_id)
+ self.set_int(event_exit.payload("_ret"), ret)
+ self.stream.append_event(event_exit)
self.stream.flush()
def write_syscall_read(self, time_ms, cpu_id, delay, fd, buf, count, ret):
- entry = CTFWriter.Event(self.syscall_entry_read)
+ event_entry = CTFWriter.Event(self.syscall_entry_read)
self.clock.time = time_ms * 1000000
- self.set_int(entry.payload("_cpu_id"), cpu_id)
- self.set_int(entry.payload("_fd"), fd)
- self.set_int(entry.payload("_count"), count)
- self.stream.append_event(entry)
+ self.set_int(event_entry.payload("_cpu_id"), cpu_id)
+ self.set_int(event_entry.payload("_fd"), fd)
+ self.set_int(event_entry.payload("_count"), count)
+ self.stream.append_event(event_entry)
- exit = CTFWriter.Event(self.syscall_exit_read)
+ event_exit = CTFWriter.Event(self.syscall_exit_read)
self.clock.time = (time_ms + delay) * 1000000
- self.set_int(exit.payload("_cpu_id"), cpu_id)
- self.set_int(exit.payload("_buf"), buf)
- self.set_int(exit.payload("_ret"), ret)
- self.stream.append_event(exit)
+ self.set_int(event_exit.payload("_cpu_id"), cpu_id)
+ self.set_int(event_exit.payload("_buf"), buf)
+ self.set_int(event_exit.payload("_ret"), ret)
+ self.stream.append_event(event_exit)
self.stream.flush()
def write_syscall_open(self, time_ms, cpu_id, delay, filename, flags,
current += period
def compare_output(self, cmd, expected):
+ line_sep = '\n'
result = subprocess.getoutput(cmd)
- diff = difflib.ndiff(expected.split('\n'), result.split('\n'))
- txt = ""
- ok = True
- for l in diff:
- if l[0] != ' ':
- ok = False
- txt = txt + (l) + '\n'
- if not ok:
- print(txt)
- return ok
+ diff = difflib.ndiff(expected.split(line_sep), result.split(line_sep))
+
+ for line in diff:
+ if line[0] != ' ':
+ # result doesn't match expected. Print the diff and
+ # return False
+ print(line_sep.join(diff))
+ return False
+
+ return True
-class AnalyzesTest():
+class AnalysesTest():
def __init__(self, delete_trace=True, verbose=False):
self.verbose = verbose
self.t = TraceTest(delete_trace=delete_trace)
self.common_options = '--no-progress --skip-validation'
self.cmd_root = './'
- self.log('Trace in %s' % (self.t.get_trace_root()))
+ self.log('Trace in %s' % (self.t.trace_root))
def log(self, msg):
if self.verbose: