fix: stats with 0 requests
[deliverable/lttng-analyses.git] / lttnganalyses / lttnganalyses / io.py
1 #!/usr/bin/env python3
2 #
3 # The MIT License (MIT)
4 #
5 # Copyright (C) 2015 - Antoine Busque <abusque@efficios.com>
6 #
7 # Permission is hereby granted, free of charge, to any person obtaining a copy
8 # of this software and associated documentation files (the "Software"), to deal
9 # in the Software without restriction, including without limitation the rights
10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 # copies of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be included in
15 # all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 # SOFTWARE.
24
25 from .analysis import Analysis
26 from linuxautomaton import sv
27
28
29 class IoAnalysis(Analysis):
30 def __init__(self, state):
31 notification_cbs = {
32 'net_dev_xmit': self._process_net_dev_xmit,
33 'netif_receive_skb': self._process_netif_receive_skb,
34 'block_rq_complete': self._process_block_rq_complete,
35 'io_rq_exit': self._process_io_rq_exit,
36 'create_fd': self._process_create_fd,
37 'close_fd': self._process_close_fd,
38 'create_parent_proc': self._process_create_parent_proc
39 }
40
41 event_cbs = {
42 'lttng_statedump_block_device':
43 self._process_lttng_statedump_block_device
44 }
45
46 self._state = state
47 self._state.register_notification_cbs(notification_cbs)
48 self._register_cbs(event_cbs)
49
50 self.disks = {}
51 self.ifaces = {}
52 self.tids = {}
53
54 def process_event(self, ev):
55 self._process_event_cb(ev)
56
57 def reset(self):
58 for dev in self.disks:
59 self.disks[dev].reset()
60
61 for name in self.ifaces:
62 self.ifaces[name].reset()
63
64 for tid in self.tids:
65 self.tids[tid].reset()
66
67 @property
68 def disk_io_requests(self):
69 for disk in self.disks.values():
70 for io_rq in disk.rq_list:
71 yield io_rq
72
73 @property
74 def io_requests(self):
75 return self._get_io_requests()
76
77 @property
78 def open_io_requests(self):
79 return self._get_io_requests(sv.IORequest.OP_OPEN)
80
81 @property
82 def read_io_requests(self):
83 return self._get_io_requests(sv.IORequest.OP_READ)
84
85 @property
86 def write_io_requests(self):
87 return self._get_io_requests(sv.IORequest.OP_WRITE)
88
89 @property
90 def close_io_requests(self):
91 return self._get_io_requests(sv.IORequest.OP_CLOSE)
92
93 @property
94 def sync_io_requests(self):
95 return self._get_io_requests(sv.IORequest.OP_SYNC)
96
97 @property
98 def read_write_io_requests(self):
99 return self._get_io_requests(sv.IORequest.OP_READ_WRITE)
100
101 def _get_io_requests(self, io_operation=None):
102 """Create a generator of syscall io requests by operation
103
104 Args:
105 io_operation (IORequest.OP_*, optional): The operation of
106 the io_requests to return. Return all IO requests if None.
107 """
108 for proc in self.tids.values():
109 for io_rq in proc.rq_list:
110 if isinstance(io_rq, sv.BlockIORequest):
111 continue
112
113 if io_operation is None or \
114 sv.IORequest.is_equivalent_operation(io_operation,
115 io_rq.operation):
116 yield io_rq
117
118 def get_files_stats(self, pid_filter_list, comm_filter_list):
119 files_stats = {}
120
121 for proc_stats in self.tids.values():
122 if pid_filter_list is not None and \
123 proc_stats.pid not in pid_filter_list or \
124 comm_filter_list is not None and \
125 proc_stats.comm not in comm_filter_list:
126 continue
127
128 for fd_list in proc_stats.fds.values():
129 for fd_stats in fd_list:
130 filename = fd_stats.filename
131 # Add process name to generic filenames to
132 # distinguish them
133 if FileStats.is_generic_name(filename):
134 filename += '(%s)' % proc_stats.comm
135
136 if filename not in files_stats:
137 if proc_stats.pid is not None:
138 pid = proc_stats.pid
139 else:
140 pid = proc_stats.tid
141
142 files_stats[filename] = FileStats(
143 filename, fd_stats.fd, pid)
144
145 files_stats[filename].update_stats(fd_stats, proc_stats)
146
147 return files_stats
148
149 @staticmethod
150 def _assign_fds_to_parent(proc, parent):
151 if proc.fds:
152 toremove = []
153 for fd in proc.fds:
154 if fd not in parent.fds:
155 parent.fds[fd] = proc.fds[fd]
156 else:
157 # best effort to fix the filename
158 if not parent.get_fd(fd).filename:
159 parent.get_fd(fd).filename = proc.get_fd(fd).filename
160 toremove.append(fd)
161 for fd in toremove:
162 del proc.fds[fd]
163
164 def _process_net_dev_xmit(self, **kwargs):
165 name = kwargs['iface_name']
166 sent_bytes = kwargs['sent_bytes']
167
168 if name not in self.ifaces:
169 self.ifaces[name] = IfaceStats(name)
170
171 self.ifaces[name].sent_packets += 1
172 self.ifaces[name].sent_bytes += sent_bytes
173
174 def _process_netif_receive_skb(self, **kwargs):
175 name = kwargs['iface_name']
176 recv_bytes = kwargs['recv_bytes']
177
178 if name not in self.ifaces:
179 self.ifaces[name] = IfaceStats(name)
180
181 self.ifaces[name].recv_packets += 1
182 self.ifaces[name].recv_bytes += recv_bytes
183
184 def _process_block_rq_complete(self, **kwargs):
185 req = kwargs['req']
186 proc = kwargs['proc']
187
188 if req.dev not in self.disks:
189 self.disks[req.dev] = DiskStats(req.dev)
190
191 self.disks[req.dev].update_stats(req)
192
193 if proc.tid not in self.tids:
194 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
195
196 self.tids[proc.tid].update_block_stats(req)
197
198 def _process_lttng_statedump_block_device(self, event):
199 dev = event['dev']
200 disk_name = event['diskname']
201
202 if dev not in self.disks:
203 self.disks[dev] = DiskStats(dev, disk_name)
204 else:
205 self.disks[dev].disk_name = disk_name
206
207 def _process_io_rq_exit(self, **kwargs):
208 proc = kwargs['proc']
209 parent_proc = kwargs['parent_proc']
210 io_rq = kwargs['io_rq']
211
212 if proc.tid not in self.tids:
213 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
214
215 if parent_proc.tid not in self.tids:
216 self.tids[parent_proc.tid] = (
217 ProcessIOStats.new_from_process(parent_proc))
218
219 proc_stats = self.tids[proc.tid]
220 parent_stats = self.tids[parent_proc.tid]
221
222 fd_types = {}
223 if io_rq.errno is None:
224 if io_rq.operation == sv.IORequest.OP_READ or \
225 io_rq.operation == sv.IORequest.OP_WRITE:
226 fd_types['fd'] = parent_stats.get_fd(io_rq.fd).fd_type
227 elif io_rq.operation == sv.IORequest.OP_READ_WRITE:
228 fd_types['fd_in'] = parent_stats.get_fd(io_rq.fd_in).fd_type
229 fd_types['fd_out'] = parent_stats.get_fd(io_rq.fd_out).fd_type
230
231 proc_stats.update_io_stats(io_rq, fd_types)
232 parent_stats.update_fd_stats(io_rq)
233
234 def _process_create_parent_proc(self, **kwargs):
235 proc = kwargs['proc']
236 parent_proc = kwargs['parent_proc']
237
238 if proc.tid not in self.tids:
239 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
240
241 if parent_proc.tid not in self.tids:
242 self.tids[parent_proc.tid] = (
243 ProcessIOStats.new_from_process(parent_proc))
244
245 proc_stats = self.tids[proc.tid]
246 parent_stats = self.tids[parent_proc.tid]
247
248 proc_stats.pid = parent_stats.tid
249 IoAnalysis._assign_fds_to_parent(proc_stats, parent_stats)
250
251 def _process_create_fd(self, **kwargs):
252 timestamp = kwargs['timestamp']
253 parent_proc = kwargs['parent_proc']
254 tid = parent_proc.tid
255 fd = kwargs['fd']
256
257 if tid not in self.tids:
258 self.tids[tid] = ProcessIOStats.new_from_process(parent_proc)
259
260 parent_stats = self.tids[tid]
261 if fd not in parent_stats.fds:
262 parent_stats.fds[fd] = []
263 parent_stats.fds[fd].append(FDStats.new_from_fd(parent_proc.fds[fd],
264 timestamp))
265
266 def _process_close_fd(self, **kwargs):
267 timestamp = kwargs['timestamp']
268 parent_proc = kwargs['parent_proc']
269 tid = parent_proc.tid
270 fd = kwargs['fd']
271
272 parent_stats = self.tids[tid]
273 last_fd = parent_stats.get_fd(fd)
274 last_fd.close_ts = timestamp
275
276
277 class DiskStats():
278 MINORBITS = 20
279 MINORMASK = ((1 << MINORBITS) - 1)
280
281 def __init__(self, dev, disk_name=None):
282 self.dev = dev
283 if disk_name is not None:
284 self.disk_name = disk_name
285 else:
286 self.disk_name = DiskStats._get_name_from_dev(dev)
287
288 self.min_rq_duration = None
289 self.max_rq_duration = None
290 self.total_rq_sectors = 0
291 self.total_rq_duration = 0
292 self.rq_list = []
293
294 @property
295 def rq_count(self):
296 return len(self.rq_list)
297
298 def update_stats(self, req):
299 if self.min_rq_duration is None or req.duration < self.min_rq_duration:
300 self.min_rq_duration = req.duration
301 if self.max_rq_duration is None or req.duration > self.max_rq_duration:
302 self.max_rq_duration = req.duration
303
304 self.total_rq_sectors += req.nr_sector
305 self.total_rq_duration += req.duration
306 self.rq_list.append(req)
307
308 def reset(self):
309 self.min_rq_duration = None
310 self.max_rq_duration = None
311 self.total_rq_sectors = 0
312 self.total_rq_duration = 0
313 self.rq_list = []
314
315 @staticmethod
316 def _get_name_from_dev(dev):
317 # imported from include/linux/kdev_t.h
318 major = dev >> DiskStats.MINORBITS
319 minor = dev & DiskStats.MINORMASK
320
321 return '(%d,%d)' % (major, minor)
322
323
324 class IfaceStats():
325 def __init__(self, name):
326 self.name = name
327 self.recv_bytes = 0
328 self.recv_packets = 0
329 self.sent_bytes = 0
330 self.sent_packets = 0
331
332 def reset(self):
333 self.recv_bytes = 0
334 self.recv_packets = 0
335 self.sent_bytes = 0
336 self.sent_packets = 0
337
338
339 class ProcessIOStats():
340 def __init__(self, pid, tid, comm):
341 self.pid = pid
342 self.tid = tid
343 self.comm = comm
344 # Number of bytes read or written by the process, by type of I/O
345 self.disk_read = 0
346 self.disk_write = 0
347 self.net_read = 0
348 self.net_write = 0
349 self.unk_read = 0
350 self.unk_write = 0
351 # Actual number of bytes read or written by the process at the
352 # block layer
353 self.block_read = 0
354 self.block_write = 0
355 # FDStats objects, indexed by fd (fileno)
356 self.fds = {}
357 self.rq_list = []
358
359 @classmethod
360 def new_from_process(cls, proc):
361 return cls(proc.pid, proc.tid, proc.comm)
362
363 # Total read/write does not account for block layer I/O
364 @property
365 def total_read(self):
366 return self.disk_read + self.net_read + self.unk_read
367
368 @property
369 def total_write(self):
370 return self.disk_write + self.net_write + self.unk_write
371
372 def update_fd_stats(self, req):
373 if req.errno is not None:
374 return
375
376 if req.fd is not None:
377 self.get_fd(req.fd).update_stats(req)
378 elif isinstance(req, sv.ReadWriteIORequest):
379 if req.fd_in is not None:
380 self.get_fd(req.fd_in).update_stats(req)
381
382 if req.fd_out is not None:
383 self.get_fd(req.fd_out).update_stats(req)
384
385 def update_block_stats(self, req):
386 self.rq_list.append(req)
387
388 if req.operation is sv.IORequest.OP_READ:
389 self.block_read += req.size
390 elif req.operation is sv.IORequest.OP_WRITE:
391 self.block_write += req.size
392
393 def update_io_stats(self, req, fd_types):
394 self.rq_list.append(req)
395
396 if req.size is None or req.errno is not None:
397 return
398
399 if req.operation is sv.IORequest.OP_READ:
400 self._update_read(req.returned_size, fd_types['fd'])
401 elif req.operation is sv.IORequest.OP_WRITE:
402 self._update_write(req.returned_size, fd_types['fd'])
403 elif req.operation is sv.IORequest.OP_READ_WRITE:
404 self._update_read(req.returned_size, fd_types['fd_in'])
405 self._update_write(req.returned_size, fd_types['fd_out'])
406
407 self.rq_list.append(req)
408
409 def _update_read(self, size, fd_type):
410 if fd_type == sv.FDType.disk:
411 self.disk_read += size
412 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
413 self.net_read += size
414 else:
415 self.unk_read += size
416
417 def _update_write(self, size, fd_type):
418 if fd_type == sv.FDType.disk:
419 self.disk_write += size
420 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
421 self.net_write += size
422 else:
423 self.unk_write += size
424
425 def _get_current_fd(self, fd):
426 fd_stats = self.fds[fd][-1]
427 if fd_stats.close_ts is not None:
428 return None
429
430 return fd_stats
431
432 @staticmethod
433 def _get_fd_by_timestamp(fd_list, timestamp):
434 """Return the FDStats object whose lifetime contains timestamp
435
436 This method performs a recursive binary search on the given
437 fd_list argument, and will find the FDStats object for which
438 the timestamp is contained between its open_ts and close_ts
439 attributes.
440
441 Args:
442 fd_list (list): list of FDStats object, sorted
443 chronologically by open_ts
444
445 timestamp (int): timestamp in nanoseconds (ns) since unix
446 epoch which should be contained in the FD's lifetime.
447
448 Returns:
449 The FDStats object whose lifetime contains the given
450 timestamp, None if no such object exists.
451 """
452 list_size = len(fd_list)
453 if list_size == 0:
454 return None
455
456 midpoint = list_size // 2
457 fd_stats = fd_list[midpoint]
458
459 # Handle case of currently open fd (i.e. no close_ts)
460 if fd_stats.close_ts is None:
461 if timestamp >= fd_stats.open_ts:
462 return fd_stats
463 else:
464 if fd_stats.open_ts <= timestamp <= fd_stats.close_ts:
465 return fd_stats
466 else:
467 if timestamp < fd_stats.open_ts:
468 return ProcessIOStats._get_fd_by_timestamp(
469 fd_list[:midpoint], timestamp)
470 else:
471 return ProcessIOStats._get_fd_by_timestamp(
472 fd_list[midpoint + 1:], timestamp)
473
474 def get_fd(self, fd, timestamp=None):
475 if fd not in self.fds or not self.fds[fd]:
476 return None
477
478 if timestamp is None:
479 fd_stats = self._get_current_fd(fd)
480 else:
481 fd_stats = ProcessIOStats._get_fd_by_timestamp(self.fds[fd],
482 timestamp)
483
484 return fd_stats
485
486 def reset(self):
487 self.disk_read = 0
488 self.disk_write = 0
489 self.net_read = 0
490 self.net_write = 0
491 self.unk_read = 0
492 self.unk_write = 0
493 self.block_read = 0
494 self.block_write = 0
495 self.rq_list = []
496
497 for fd in self.fds:
498 fd_stats = self.get_fd(fd)
499 if fd_stats is not None:
500 fd_stats.reset()
501
502
503 class FDStats():
504 def __init__(self, fd, filename, fd_type, cloexec, family, open_ts):
505 self.fd = fd
506 self.filename = filename
507 self.fd_type = fd_type
508 self.cloexec = cloexec
509 self.family = family
510 self.open_ts = open_ts
511 self.close_ts = None
512
513 # Number of bytes read or written
514 self.read = 0
515 self.write = 0
516 # IO Requests that acted upon the FD
517 self.rq_list = []
518
519 @classmethod
520 def new_from_fd(cls, fd, open_ts):
521 return cls(fd.fd, fd.filename, fd.fd_type, fd.cloexec, fd.family,
522 open_ts)
523
524 def update_stats(self, req):
525 if req.operation is sv.IORequest.OP_READ:
526 self.read += req.returned_size
527 elif req.operation is sv.IORequest.OP_WRITE:
528 self.write += req.returned_size
529 elif req.operation is sv.IORequest.OP_READ_WRITE:
530 if self.fd == req.fd_in:
531 self.read += req.returned_size
532 elif self.fd == req.fd_out:
533 self.write += req.returned_size
534
535 self.rq_list.append(req)
536
537 def reset(self):
538 self.read = 0
539 self.write = 0
540 self.rq_list = []
541
542 class FileStats():
543 GENERIC_NAMES = ['pipe', 'socket', 'anon_inode', 'unknown']
544
545 def __init__(self, filename, fd, pid):
546 self.filename = filename
547 # Number of bytes read or written
548 self.read = 0
549 self.write = 0
550 # Dict of file descriptors representing this file, indexed by
551 # parent pid
552 self.fd_by_pid = {pid: fd}
553
554 def update_stats(self, fd_stats, proc_stats):
555 self.read += fd_stats.read
556 self.write += fd_stats.write
557
558 if proc_stats.pid is not None:
559 pid = proc_stats.pid
560 else:
561 pid = proc_stats.tid
562
563 if pid not in self.fd_by_pid:
564 self.fd_by_pid[pid] = fd_stats.fd
565
566 def reset(self):
567 self.read = 0
568 self.write = 0
569
570 @staticmethod
571 def is_generic_name(filename):
572 for generic_name in FileStats.GENERIC_NAMES:
573 if filename.startswith(generic_name):
574 return True
575
576 return False
This page took 0.045126 seconds and 5 git commands to generate.