Fix: set proc name in I/O analysis if previously unknown
[deliverable/lttng-analyses.git] / lttnganalyses / core / io.py
1 # The MIT License (MIT)
2 #
3 # Copyright (C) 2015 - Antoine Busque <abusque@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 # SOFTWARE.
22
23 from . import stats
24 from .analysis import Analysis
25 from ..linuxautomaton import sv
26
27
28 class IoAnalysis(Analysis):
29 def __init__(self, state, conf):
30 notification_cbs = {
31 'net_dev_xmit': self._process_net_dev_xmit,
32 'netif_receive_skb': self._process_netif_receive_skb,
33 'block_rq_complete': self._process_block_rq_complete,
34 'io_rq_exit': self._process_io_rq_exit,
35 'create_fd': self._process_create_fd,
36 'close_fd': self._process_close_fd,
37 'update_fd': self._process_update_fd,
38 'create_parent_proc': self._process_create_parent_proc
39 }
40
41 event_cbs = {
42 'lttng_statedump_block_device':
43 self._process_lttng_statedump_block_device
44 }
45
46 super().__init__(state, conf)
47 self._state.register_notification_cbs(notification_cbs)
48 self._register_cbs(event_cbs)
49
50 self.disks = {}
51 self.ifaces = {}
52 self.tids = {}
53
54 def process_event(self, ev):
55 super().process_event(ev)
56 self._process_event_cb(ev)
57
58 def reset(self):
59 for dev in self.disks:
60 self.disks[dev].reset()
61
62 for name in self.ifaces:
63 self.ifaces[name].reset()
64
65 for tid in self.tids:
66 self.tids[tid].reset()
67
68 @property
69 def disk_io_requests(self):
70 for disk in self.disks.values():
71 for io_rq in disk.rq_list:
72 yield io_rq
73
74 @property
75 def io_requests(self):
76 return self._get_io_requests()
77
78 @property
79 def open_io_requests(self):
80 return self._get_io_requests(sv.IORequest.OP_OPEN)
81
82 @property
83 def read_io_requests(self):
84 return self._get_io_requests(sv.IORequest.OP_READ)
85
86 @property
87 def write_io_requests(self):
88 return self._get_io_requests(sv.IORequest.OP_WRITE)
89
90 @property
91 def close_io_requests(self):
92 return self._get_io_requests(sv.IORequest.OP_CLOSE)
93
94 @property
95 def sync_io_requests(self):
96 return self._get_io_requests(sv.IORequest.OP_SYNC)
97
98 @property
99 def read_write_io_requests(self):
100 return self._get_io_requests(sv.IORequest.OP_READ_WRITE)
101
102 def _get_io_requests(self, io_operation=None):
103 """Create a generator of syscall io requests by operation
104
105 Args:
106 io_operation (IORequest.OP_*, optional): The operation of
107 the io_requests to return. Return all IO requests if None.
108 """
109 for proc in self.tids.values():
110 for io_rq in proc.rq_list:
111 if isinstance(io_rq, sv.BlockIORequest):
112 continue
113
114 if io_operation is None or \
115 sv.IORequest.is_equivalent_operation(io_operation,
116 io_rq.operation):
117 yield io_rq
118
119 def get_files_stats(self):
120 files_stats = {}
121
122 for proc_stats in self.tids.values():
123 for fd_list in proc_stats.fds.values():
124 for fd_stats in fd_list:
125 filename = fd_stats.filename
126 # Add process name to generic filenames to
127 # distinguish them
128 if FileStats.is_generic_name(filename):
129 filename += '(%s)' % proc_stats.comm
130
131 if filename not in files_stats:
132 files_stats[filename] = FileStats(filename)
133
134 files_stats[filename].update_stats(fd_stats, proc_stats)
135
136 return files_stats
137
138 @staticmethod
139 def _assign_fds_to_parent(proc, parent):
140 if proc.fds:
141 toremove = []
142 for fd in proc.fds:
143 if fd not in parent.fds:
144 parent.fds[fd] = proc.fds[fd]
145 else:
146 # best effort to fix the filename
147 if not parent.get_fd(fd).filename:
148 parent.get_fd(fd).filename = proc.get_fd(fd).filename
149 toremove.append(fd)
150 for fd in toremove:
151 del proc.fds[fd]
152
153 def _process_net_dev_xmit(self, **kwargs):
154 name = kwargs['iface_name']
155 sent_bytes = kwargs['sent_bytes']
156 cpu = kwargs['cpu_id']
157
158 if not self._filter_cpu(cpu):
159 return
160
161 if name not in self.ifaces:
162 self.ifaces[name] = IfaceStats(name)
163
164 self.ifaces[name].sent_packets += 1
165 self.ifaces[name].sent_bytes += sent_bytes
166
167 def _process_netif_receive_skb(self, **kwargs):
168 name = kwargs['iface_name']
169 recv_bytes = kwargs['recv_bytes']
170 cpu = kwargs['cpu_id']
171
172 if not self._filter_cpu(cpu):
173 return
174
175 if name not in self.ifaces:
176 self.ifaces[name] = IfaceStats(name)
177
178 self.ifaces[name].recv_packets += 1
179 self.ifaces[name].recv_bytes += recv_bytes
180
181 def _process_block_rq_complete(self, **kwargs):
182 req = kwargs['req']
183 proc = kwargs['proc']
184 cpu = kwargs['cpu_id']
185
186 if not self._filter_process(proc):
187 return
188 if not self._filter_cpu(cpu):
189 return
190
191 if req.dev not in self.disks:
192 self.disks[req.dev] = DiskStats(req.dev)
193
194 self.disks[req.dev].update_stats(req)
195
196 if proc is not None:
197 if proc.tid not in self.tids:
198 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
199
200 self.tids[proc.tid].update_block_stats(req)
201
202 def _process_lttng_statedump_block_device(self, event):
203 dev = event['dev']
204 disk_name = event['diskname']
205
206 if dev not in self.disks:
207 self.disks[dev] = DiskStats(dev, disk_name)
208 else:
209 self.disks[dev].disk_name = disk_name
210
211 def _process_io_rq_exit(self, **kwargs):
212 proc = kwargs['proc']
213 parent_proc = kwargs['parent_proc']
214 io_rq = kwargs['io_rq']
215 cpu = kwargs['cpu_id']
216
217 if not self._filter_process(parent_proc):
218 return
219 if not self._filter_cpu(cpu):
220 return
221
222 if proc.tid not in self.tids:
223 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
224
225 if parent_proc.tid not in self.tids:
226 self.tids[parent_proc.tid] = (
227 ProcessIOStats.new_from_process(parent_proc))
228
229 proc_stats = self.tids[proc.tid]
230 parent_stats = self.tids[parent_proc.tid]
231
232 fd_types = {}
233 if io_rq.errno is None:
234 if io_rq.operation == sv.IORequest.OP_READ or \
235 io_rq.operation == sv.IORequest.OP_WRITE:
236 fd_types['fd'] = parent_stats.get_fd(io_rq.fd).fd_type
237 elif io_rq.operation == sv.IORequest.OP_READ_WRITE:
238 fd_types['fd_in'] = parent_stats.get_fd(io_rq.fd_in).fd_type
239 fd_types['fd_out'] = parent_stats.get_fd(io_rq.fd_out).fd_type
240
241 proc_stats.update_io_stats(io_rq, fd_types)
242 parent_stats.update_fd_stats(io_rq)
243
244 # Check if the proc stats comm corresponds to the actual
245 # process comm. It might be that it was missing so far.
246 if proc_stats.comm != proc.comm:
247 proc_stats.comm = proc.comm
248 if parent_stats.comm != parent_proc.comm:
249 parent_stats.comm = parent_proc.comm
250
251 def _process_create_parent_proc(self, **kwargs):
252 proc = kwargs['proc']
253 parent_proc = kwargs['parent_proc']
254
255 if not self._filter_process(parent_proc):
256 return
257
258 if proc.tid not in self.tids:
259 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
260
261 if parent_proc.tid not in self.tids:
262 self.tids[parent_proc.tid] = (
263 ProcessIOStats.new_from_process(parent_proc))
264
265 proc_stats = self.tids[proc.tid]
266 parent_stats = self.tids[parent_proc.tid]
267
268 proc_stats.pid = parent_stats.tid
269 IoAnalysis._assign_fds_to_parent(proc_stats, parent_stats)
270
271 def _process_create_fd(self, **kwargs):
272 timestamp = kwargs['timestamp']
273 parent_proc = kwargs['parent_proc']
274 tid = parent_proc.tid
275 cpu = kwargs['cpu_id']
276 fd = kwargs['fd']
277
278 if not self._filter_process(parent_proc):
279 return
280 if not self._filter_cpu(cpu):
281 return
282
283 if tid not in self.tids:
284 self.tids[tid] = ProcessIOStats.new_from_process(parent_proc)
285
286 parent_stats = self.tids[tid]
287 if fd not in parent_stats.fds:
288 parent_stats.fds[fd] = []
289 parent_stats.fds[fd].append(FDStats.new_from_fd(parent_proc.fds[fd],
290 timestamp))
291
292 def _process_close_fd(self, **kwargs):
293 timestamp = kwargs['timestamp']
294 parent_proc = kwargs['parent_proc']
295 tid = parent_proc.tid
296 cpu = kwargs['cpu_id']
297 fd = kwargs['fd']
298
299 if not self._filter_process(parent_proc):
300 return
301 if not self._filter_cpu(cpu):
302 return
303
304 parent_stats = self.tids[tid]
305 last_fd = parent_stats.get_fd(fd)
306 last_fd.close_ts = timestamp
307
308 def _process_update_fd(self, **kwargs):
309 parent_proc = kwargs['parent_proc']
310 tid = parent_proc.tid
311 fd = kwargs['fd']
312
313 new_filename = parent_proc.fds[fd].filename
314 fd_list = self.tids[tid].fds[fd]
315 fd_list[-1].filename = new_filename
316
317
318 class DiskStats():
319 MINORBITS = 20
320 MINORMASK = ((1 << MINORBITS) - 1)
321
322 def __init__(self, dev, disk_name=None):
323 self.dev = dev
324 if disk_name is not None:
325 self.disk_name = disk_name
326 else:
327 self.disk_name = DiskStats._get_name_from_dev(dev)
328
329 self.min_rq_duration = None
330 self.max_rq_duration = None
331 self.total_rq_sectors = 0
332 self.total_rq_duration = 0
333 self.rq_list = []
334
335 @property
336 def rq_count(self):
337 return len(self.rq_list)
338
339 def update_stats(self, req):
340 if self.min_rq_duration is None or req.duration < self.min_rq_duration:
341 self.min_rq_duration = req.duration
342 if self.max_rq_duration is None or req.duration > self.max_rq_duration:
343 self.max_rq_duration = req.duration
344
345 self.total_rq_sectors += req.nr_sector
346 self.total_rq_duration += req.duration
347 self.rq_list.append(req)
348
349 def reset(self):
350 self.min_rq_duration = None
351 self.max_rq_duration = None
352 self.total_rq_sectors = 0
353 self.total_rq_duration = 0
354 self.rq_list = []
355
356 @staticmethod
357 def _get_name_from_dev(dev):
358 # imported from include/linux/kdev_t.h
359 major = dev >> DiskStats.MINORBITS
360 minor = dev & DiskStats.MINORMASK
361
362 return '(%d,%d)' % (major, minor)
363
364
365 class IfaceStats():
366 def __init__(self, name):
367 self.name = name
368 self.recv_bytes = 0
369 self.recv_packets = 0
370 self.sent_bytes = 0
371 self.sent_packets = 0
372
373 def reset(self):
374 self.recv_bytes = 0
375 self.recv_packets = 0
376 self.sent_bytes = 0
377 self.sent_packets = 0
378
379
380 class ProcessIOStats(stats.Process):
381 def __init__(self, pid, tid, comm):
382 super().__init__(pid, tid, comm)
383 self.disk_io = stats.IO()
384 self.net_io = stats.IO()
385 self.unk_io = stats.IO()
386 self.block_io = stats.IO()
387 # FDStats objects, indexed by fd (fileno)
388 self.fds = {}
389 self.rq_list = []
390
391 @classmethod
392 def new_from_process(cls, proc):
393 return cls(proc.pid, proc.tid, proc.comm)
394
395 # Total read/write does not account for block layer I/O
396 @property
397 def total_read(self):
398 return self.disk_io.read + self.net_io.read + self.unk_io.read
399
400 @property
401 def total_write(self):
402 return self.disk_io.write + self.net_io.write + self.unk_io.write
403
404 def update_fd_stats(self, req):
405 if req.errno is not None:
406 return
407
408 if req.fd is not None:
409 self.get_fd(req.fd).update_stats(req)
410 elif isinstance(req, sv.ReadWriteIORequest):
411 if req.fd_in is not None:
412 self.get_fd(req.fd_in).update_stats(req)
413
414 if req.fd_out is not None:
415 self.get_fd(req.fd_out).update_stats(req)
416
417 def update_block_stats(self, req):
418 self.rq_list.append(req)
419
420 if req.operation is sv.IORequest.OP_READ:
421 self.block_io.read += req.size
422 elif req.operation is sv.IORequest.OP_WRITE:
423 self.block_io.write += req.size
424
425 def update_io_stats(self, req, fd_types):
426 self.rq_list.append(req)
427
428 if req.size is None or req.errno is not None:
429 return
430
431 if req.operation is sv.IORequest.OP_READ:
432 self._update_read(req.returned_size, fd_types['fd'])
433 elif req.operation is sv.IORequest.OP_WRITE:
434 self._update_write(req.returned_size, fd_types['fd'])
435 elif req.operation is sv.IORequest.OP_READ_WRITE:
436 self._update_read(req.returned_size, fd_types['fd_in'])
437 self._update_write(req.returned_size, fd_types['fd_out'])
438
439 def _update_read(self, size, fd_type):
440 if fd_type == sv.FDType.disk:
441 self.disk_io.read += size
442 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
443 self.net_io.read += size
444 else:
445 self.unk_io.read += size
446
447 def _update_write(self, size, fd_type):
448 if fd_type == sv.FDType.disk:
449 self.disk_io.write += size
450 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
451 self.net_io.write += size
452 else:
453 self.unk_io.write += size
454
455 def _get_current_fd(self, fd):
456 fd_stats = self.fds[fd][-1]
457 if fd_stats.close_ts is not None:
458 return None
459
460 return fd_stats
461
462 @staticmethod
463 def _get_fd_by_timestamp(fd_list, timestamp):
464 """Return the FDStats object whose lifetime contains timestamp
465
466 This method performs a recursive binary search on the given
467 fd_list argument, and will find the FDStats object for which
468 the timestamp is contained between its open_ts and close_ts
469 attributes.
470
471 Args:
472 fd_list (list): list of FDStats object, sorted
473 chronologically by open_ts
474
475 timestamp (int): timestamp in nanoseconds (ns) since unix
476 epoch which should be contained in the FD's lifetime.
477
478 Returns:
479 The FDStats object whose lifetime contains the given
480 timestamp, None if no such object exists.
481 """
482 list_size = len(fd_list)
483 if list_size == 0:
484 return None
485
486 midpoint = list_size // 2
487 fd_stats = fd_list[midpoint]
488
489 # Handle case of currently open fd (i.e. no close_ts)
490 if fd_stats.close_ts is None:
491 if timestamp >= fd_stats.open_ts:
492 return fd_stats
493 else:
494 if fd_stats.open_ts <= timestamp <= fd_stats.close_ts:
495 return fd_stats
496 else:
497 if timestamp < fd_stats.open_ts:
498 return ProcessIOStats._get_fd_by_timestamp(
499 fd_list[:midpoint], timestamp)
500 else:
501 return ProcessIOStats._get_fd_by_timestamp(
502 fd_list[midpoint + 1:], timestamp)
503
504 def get_fd(self, fd, timestamp=None):
505 if fd not in self.fds or not self.fds[fd]:
506 return None
507
508 if timestamp is None:
509 fd_stats = self._get_current_fd(fd)
510 else:
511 fd_stats = ProcessIOStats._get_fd_by_timestamp(self.fds[fd],
512 timestamp)
513
514 return fd_stats
515
516 def reset(self):
517 self.disk_io.reset()
518 self.net_io.reset()
519 self.unk_io.reset()
520 self.block_io.reset()
521 self.rq_list = []
522
523 for fd in self.fds:
524 fd_stats = self.get_fd(fd)
525 if fd_stats is not None:
526 fd_stats.reset()
527
528
529 class FDStats():
530 def __init__(self, fd, filename, fd_type, cloexec, family, open_ts):
531 self.fd = fd
532 self.filename = filename
533 self.fd_type = fd_type
534 self.cloexec = cloexec
535 self.family = family
536 self.open_ts = open_ts
537 self.close_ts = None
538 self.io = stats.IO()
539 # IO Requests that acted upon the FD
540 self.rq_list = []
541
542 @classmethod
543 def new_from_fd(cls, fd, open_ts):
544 return cls(fd.fd, fd.filename, fd.fd_type, fd.cloexec, fd.family,
545 open_ts)
546
547 def update_stats(self, req):
548 if req.operation is sv.IORequest.OP_READ:
549 self.io.read += req.returned_size
550 elif req.operation is sv.IORequest.OP_WRITE:
551 self.io.write += req.returned_size
552 elif req.operation is sv.IORequest.OP_READ_WRITE:
553 if self.fd == req.fd_in:
554 self.io.read += req.returned_size
555 elif self.fd == req.fd_out:
556 self.io.write += req.returned_size
557
558 self.rq_list.append(req)
559
560 def reset(self):
561 self.io.reset()
562 self.rq_list = []
563
564
565 class FileStats():
566 GENERIC_NAMES = ['pipe', 'socket', 'anon_inode', 'unknown']
567
568 def __init__(self, filename):
569 self.filename = filename
570 self.io = stats.IO()
571 # Dict of file descriptors representing this file, indexed by
572 # parent pid
573 # FIXME this doesn't cover FD reuse cases
574 self.fd_by_pid = {}
575
576 def update_stats(self, fd_stats, proc_stats):
577 self.io += fd_stats.io
578
579 if proc_stats.pid is not None:
580 pid = proc_stats.pid
581 else:
582 pid = proc_stats.tid
583
584 if pid not in self.fd_by_pid:
585 self.fd_by_pid[pid] = fd_stats.fd
586
587 def reset(self):
588 self.io.reset()
589
590 @staticmethod
591 def is_generic_name(filename):
592 for generic_name in FileStats.GENERIC_NAMES:
593 if filename.startswith(generic_name):
594 return True
595
596 return False
This page took 0.061128 seconds and 5 git commands to generate.