Fix: set proc name in I/O analysis if previously unknown
[deliverable/lttng-analyses.git] / lttnganalyses / core / io.py
CommitLineData
6cd52af3
AB
1# The MIT License (MIT)
2#
3# Copyright (C) 2015 - Antoine Busque <abusque@efficios.com>
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21# SOFTWARE.
22
f67842f8 23from . import stats
6cd52af3 24from .analysis import Analysis
56936af2 25from ..linuxautomaton import sv
6cd52af3
AB
26
27
28class IoAnalysis(Analysis):
b6d9132b 29 def __init__(self, state, conf):
2c4eb868
AB
30 notification_cbs = {
31 'net_dev_xmit': self._process_net_dev_xmit,
3b1dda96 32 'netif_receive_skb': self._process_netif_receive_skb,
6744346a
AB
33 'block_rq_complete': self._process_block_rq_complete,
34 'io_rq_exit': self._process_io_rq_exit,
35 'create_fd': self._process_create_fd,
36 'close_fd': self._process_close_fd,
fa796391 37 'update_fd': self._process_update_fd,
6744346a 38 'create_parent_proc': self._process_create_parent_proc
3b1dda96
AB
39 }
40
41 event_cbs = {
42 'lttng_statedump_block_device':
43 self._process_lttng_statedump_block_device
2c4eb868
AB
44 }
45
b6d9132b 46 super().__init__(state, conf)
2c4eb868 47 self._state.register_notification_cbs(notification_cbs)
3b1dda96
AB
48 self._register_cbs(event_cbs)
49
50 self.disks = {}
2c4eb868 51 self.ifaces = {}
3b1dda96 52 self.tids = {}
6cd52af3
AB
53
54 def process_event(self, ev):
b6d9132b 55 super().process_event(ev)
3b1dda96 56 self._process_event_cb(ev)
6cd52af3
AB
57
58 def reset(self):
3b1dda96
AB
59 for dev in self.disks:
60 self.disks[dev].reset()
61
62 for name in self.ifaces:
63 self.ifaces[name].reset()
64
65 for tid in self.tids:
66 self.tids[tid].reset()
2c4eb868 67
392b22fe
AB
68 @property
69 def disk_io_requests(self):
70 for disk in self.disks.values():
71 for io_rq in disk.rq_list:
72 yield io_rq
73
74 @property
75 def io_requests(self):
76 return self._get_io_requests()
77
78 @property
79 def open_io_requests(self):
80 return self._get_io_requests(sv.IORequest.OP_OPEN)
81
82 @property
83 def read_io_requests(self):
84 return self._get_io_requests(sv.IORequest.OP_READ)
85
86 @property
87 def write_io_requests(self):
88 return self._get_io_requests(sv.IORequest.OP_WRITE)
89
90 @property
91 def close_io_requests(self):
92 return self._get_io_requests(sv.IORequest.OP_CLOSE)
93
94 @property
95 def sync_io_requests(self):
96 return self._get_io_requests(sv.IORequest.OP_SYNC)
97
98 @property
99 def read_write_io_requests(self):
100 return self._get_io_requests(sv.IORequest.OP_READ_WRITE)
101
102 def _get_io_requests(self, io_operation=None):
103 """Create a generator of syscall io requests by operation
104
105 Args:
106 io_operation (IORequest.OP_*, optional): The operation of
107 the io_requests to return. Return all IO requests if None.
108 """
109 for proc in self.tids.values():
1eff31df
AB
110 for io_rq in proc.rq_list:
111 if isinstance(io_rq, sv.BlockIORequest):
112 continue
113
114 if io_operation is None or \
115 sv.IORequest.is_equivalent_operation(io_operation,
116 io_rq.operation):
117 yield io_rq
392b22fe 118
43b66dd6 119 def get_files_stats(self):
e0e5f1fd
AB
120 files_stats = {}
121
122 for proc_stats in self.tids.values():
e0e5f1fd
AB
123 for fd_list in proc_stats.fds.values():
124 for fd_stats in fd_list:
125 filename = fd_stats.filename
126 # Add process name to generic filenames to
127 # distinguish them
128 if FileStats.is_generic_name(filename):
129 filename += '(%s)' % proc_stats.comm
130
131 if filename not in files_stats:
f67842f8 132 files_stats[filename] = FileStats(filename)
e0e5f1fd
AB
133
134 files_stats[filename].update_stats(fd_stats, proc_stats)
135
136 return files_stats
137
d3b1efd7
AB
138 @staticmethod
139 def _assign_fds_to_parent(proc, parent):
140 if proc.fds:
141 toremove = []
142 for fd in proc.fds:
143 if fd not in parent.fds:
144 parent.fds[fd] = proc.fds[fd]
145 else:
146 # best effort to fix the filename
147 if not parent.get_fd(fd).filename:
148 parent.get_fd(fd).filename = proc.get_fd(fd).filename
149 toremove.append(fd)
150 for fd in toremove:
151 del proc.fds[fd]
152
2c4eb868
AB
153 def _process_net_dev_xmit(self, **kwargs):
154 name = kwargs['iface_name']
155 sent_bytes = kwargs['sent_bytes']
a621ba35
AB
156 cpu = kwargs['cpu_id']
157
158 if not self._filter_cpu(cpu):
159 return
2c4eb868
AB
160
161 if name not in self.ifaces:
162 self.ifaces[name] = IfaceStats(name)
163
164 self.ifaces[name].sent_packets += 1
165 self.ifaces[name].sent_bytes += sent_bytes
166
167 def _process_netif_receive_skb(self, **kwargs):
168 name = kwargs['iface_name']
169 recv_bytes = kwargs['recv_bytes']
a621ba35
AB
170 cpu = kwargs['cpu_id']
171
172 if not self._filter_cpu(cpu):
173 return
2c4eb868
AB
174
175 if name not in self.ifaces:
176 self.ifaces[name] = IfaceStats(name)
177
178 self.ifaces[name].recv_packets += 1
179 self.ifaces[name].recv_bytes += recv_bytes
180
3b1dda96
AB
181 def _process_block_rq_complete(self, **kwargs):
182 req = kwargs['req']
183 proc = kwargs['proc']
a621ba35 184 cpu = kwargs['cpu_id']
3b1dda96 185
43b66dd6
AB
186 if not self._filter_process(proc):
187 return
a621ba35
AB
188 if not self._filter_cpu(cpu):
189 return
43b66dd6 190
3b1dda96
AB
191 if req.dev not in self.disks:
192 self.disks[req.dev] = DiskStats(req.dev)
193
194 self.disks[req.dev].update_stats(req)
195
f7a2ca1b
JD
196 if proc is not None:
197 if proc.tid not in self.tids:
198 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
3b1dda96 199
f7a2ca1b 200 self.tids[proc.tid].update_block_stats(req)
3b1dda96
AB
201
202 def _process_lttng_statedump_block_device(self, event):
203 dev = event['dev']
204 disk_name = event['diskname']
205
206 if dev not in self.disks:
207 self.disks[dev] = DiskStats(dev, disk_name)
208 else:
209 self.disks[dev].disk_name = disk_name
210
6744346a
AB
211 def _process_io_rq_exit(self, **kwargs):
212 proc = kwargs['proc']
213 parent_proc = kwargs['parent_proc']
214 io_rq = kwargs['io_rq']
a621ba35 215 cpu = kwargs['cpu_id']
6744346a 216
43b66dd6
AB
217 if not self._filter_process(parent_proc):
218 return
a621ba35
AB
219 if not self._filter_cpu(cpu):
220 return
43b66dd6 221
6744346a
AB
222 if proc.tid not in self.tids:
223 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
224
225 if parent_proc.tid not in self.tids:
226 self.tids[parent_proc.tid] = (
227 ProcessIOStats.new_from_process(parent_proc))
228
229 proc_stats = self.tids[proc.tid]
230 parent_stats = self.tids[parent_proc.tid]
231
232 fd_types = {}
233 if io_rq.errno is None:
234 if io_rq.operation == sv.IORequest.OP_READ or \
235 io_rq.operation == sv.IORequest.OP_WRITE:
d3b1efd7 236 fd_types['fd'] = parent_stats.get_fd(io_rq.fd).fd_type
6744346a 237 elif io_rq.operation == sv.IORequest.OP_READ_WRITE:
d3b1efd7
AB
238 fd_types['fd_in'] = parent_stats.get_fd(io_rq.fd_in).fd_type
239 fd_types['fd_out'] = parent_stats.get_fd(io_rq.fd_out).fd_type
6744346a
AB
240
241 proc_stats.update_io_stats(io_rq, fd_types)
242 parent_stats.update_fd_stats(io_rq)
243
147fff16
AB
244 # Check if the proc stats comm corresponds to the actual
245 # process comm. It might be that it was missing so far.
246 if proc_stats.comm != proc.comm:
247 proc_stats.comm = proc.comm
248 if parent_stats.comm != parent_proc.comm:
249 parent_stats.comm = parent_proc.comm
250
6744346a
AB
251 def _process_create_parent_proc(self, **kwargs):
252 proc = kwargs['proc']
253 parent_proc = kwargs['parent_proc']
254
43b66dd6
AB
255 if not self._filter_process(parent_proc):
256 return
257
6744346a
AB
258 if proc.tid not in self.tids:
259 self.tids[proc.tid] = ProcessIOStats.new_from_process(proc)
260
261 if parent_proc.tid not in self.tids:
262 self.tids[parent_proc.tid] = (
263 ProcessIOStats.new_from_process(parent_proc))
264
265 proc_stats = self.tids[proc.tid]
266 parent_stats = self.tids[parent_proc.tid]
267
268 proc_stats.pid = parent_stats.tid
d3b1efd7 269 IoAnalysis._assign_fds_to_parent(proc_stats, parent_stats)
6744346a
AB
270
271 def _process_create_fd(self, **kwargs):
d3b1efd7 272 timestamp = kwargs['timestamp']
6744346a
AB
273 parent_proc = kwargs['parent_proc']
274 tid = parent_proc.tid
a621ba35 275 cpu = kwargs['cpu_id']
6744346a
AB
276 fd = kwargs['fd']
277
43b66dd6
AB
278 if not self._filter_process(parent_proc):
279 return
a621ba35
AB
280 if not self._filter_cpu(cpu):
281 return
43b66dd6 282
6744346a
AB
283 if tid not in self.tids:
284 self.tids[tid] = ProcessIOStats.new_from_process(parent_proc)
285
286 parent_stats = self.tids[tid]
d3b1efd7
AB
287 if fd not in parent_stats.fds:
288 parent_stats.fds[fd] = []
289 parent_stats.fds[fd].append(FDStats.new_from_fd(parent_proc.fds[fd],
290 timestamp))
6744346a
AB
291
292 def _process_close_fd(self, **kwargs):
d3b1efd7 293 timestamp = kwargs['timestamp']
6744346a
AB
294 parent_proc = kwargs['parent_proc']
295 tid = parent_proc.tid
a621ba35 296 cpu = kwargs['cpu_id']
6744346a
AB
297 fd = kwargs['fd']
298
43b66dd6
AB
299 if not self._filter_process(parent_proc):
300 return
a621ba35
AB
301 if not self._filter_cpu(cpu):
302 return
43b66dd6 303
6744346a 304 parent_stats = self.tids[tid]
d3b1efd7
AB
305 last_fd = parent_stats.get_fd(fd)
306 last_fd.close_ts = timestamp
6744346a 307
fa796391
AB
308 def _process_update_fd(self, **kwargs):
309 parent_proc = kwargs['parent_proc']
310 tid = parent_proc.tid
311 fd = kwargs['fd']
312
313 new_filename = parent_proc.fds[fd].filename
314 fd_list = self.tids[tid].fds[fd]
315 fd_list[-1].filename = new_filename
316
3b1dda96
AB
317
318class DiskStats():
319 MINORBITS = 20
320 MINORMASK = ((1 << MINORBITS) - 1)
321
322 def __init__(self, dev, disk_name=None):
323 self.dev = dev
324 if disk_name is not None:
325 self.disk_name = disk_name
326 else:
327 self.disk_name = DiskStats._get_name_from_dev(dev)
328
329 self.min_rq_duration = None
330 self.max_rq_duration = None
331 self.total_rq_sectors = 0
332 self.total_rq_duration = 0
333 self.rq_list = []
334
335 @property
336 def rq_count(self):
337 return len(self.rq_list)
338
339 def update_stats(self, req):
340 if self.min_rq_duration is None or req.duration < self.min_rq_duration:
341 self.min_rq_duration = req.duration
342 if self.max_rq_duration is None or req.duration > self.max_rq_duration:
343 self.max_rq_duration = req.duration
344
345 self.total_rq_sectors += req.nr_sector
346 self.total_rq_duration += req.duration
347 self.rq_list.append(req)
348
349 def reset(self):
350 self.min_rq_duration = None
351 self.max_rq_duration = None
352 self.total_rq_sectors = 0
353 self.total_rq_duration = 0
354 self.rq_list = []
355
356 @staticmethod
357 def _get_name_from_dev(dev):
358 # imported from include/linux/kdev_t.h
359 major = dev >> DiskStats.MINORBITS
360 minor = dev & DiskStats.MINORMASK
361
362 return '(%d,%d)' % (major, minor)
363
2c4eb868
AB
364
365class IfaceStats():
366 def __init__(self, name):
367 self.name = name
368 self.recv_bytes = 0
369 self.recv_packets = 0
370 self.sent_bytes = 0
371 self.sent_packets = 0
372
373 def reset(self):
374 self.recv_bytes = 0
375 self.recv_packets = 0
376 self.sent_bytes = 0
377 self.sent_packets = 0
3b1dda96
AB
378
379
f67842f8 380class ProcessIOStats(stats.Process):
3b1dda96 381 def __init__(self, pid, tid, comm):
f67842f8
AB
382 super().__init__(pid, tid, comm)
383 self.disk_io = stats.IO()
384 self.net_io = stats.IO()
385 self.unk_io = stats.IO()
386 self.block_io = stats.IO()
6744346a
AB
387 # FDStats objects, indexed by fd (fileno)
388 self.fds = {}
3b1dda96
AB
389 self.rq_list = []
390
391 @classmethod
392 def new_from_process(cls, proc):
393 return cls(proc.pid, proc.tid, proc.comm)
394
395 # Total read/write does not account for block layer I/O
396 @property
397 def total_read(self):
f67842f8 398 return self.disk_io.read + self.net_io.read + self.unk_io.read
3b1dda96
AB
399
400 @property
401 def total_write(self):
f67842f8 402 return self.disk_io.write + self.net_io.write + self.unk_io.write
3b1dda96 403
6744346a
AB
404 def update_fd_stats(self, req):
405 if req.errno is not None:
406 return
407
408 if req.fd is not None:
d3b1efd7 409 self.get_fd(req.fd).update_stats(req)
6744346a
AB
410 elif isinstance(req, sv.ReadWriteIORequest):
411 if req.fd_in is not None:
d3b1efd7 412 self.get_fd(req.fd_in).update_stats(req)
6744346a
AB
413
414 if req.fd_out is not None:
d3b1efd7 415 self.get_fd(req.fd_out).update_stats(req)
3b1dda96 416
6744346a 417 def update_block_stats(self, req):
3b1dda96
AB
418 self.rq_list.append(req)
419
3b1dda96 420 if req.operation is sv.IORequest.OP_READ:
f67842f8 421 self.block_io.read += req.size
3b1dda96 422 elif req.operation is sv.IORequest.OP_WRITE:
f67842f8 423 self.block_io.write += req.size
3b1dda96 424
6744346a
AB
425 def update_io_stats(self, req, fd_types):
426 self.rq_list.append(req)
427
428 if req.size is None or req.errno is not None:
429 return
430
431 if req.operation is sv.IORequest.OP_READ:
432 self._update_read(req.returned_size, fd_types['fd'])
433 elif req.operation is sv.IORequest.OP_WRITE:
434 self._update_write(req.returned_size, fd_types['fd'])
435 elif req.operation is sv.IORequest.OP_READ_WRITE:
436 self._update_read(req.returned_size, fd_types['fd_in'])
437 self._update_write(req.returned_size, fd_types['fd_out'])
438
6744346a
AB
439 def _update_read(self, size, fd_type):
440 if fd_type == sv.FDType.disk:
f67842f8 441 self.disk_io.read += size
6744346a 442 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
f67842f8 443 self.net_io.read += size
6744346a 444 else:
f67842f8 445 self.unk_io.read += size
6744346a
AB
446
447 def _update_write(self, size, fd_type):
448 if fd_type == sv.FDType.disk:
f67842f8 449 self.disk_io.write += size
6744346a 450 elif fd_type == sv.FDType.net or fd_type == sv.FDType.maybe_net:
f67842f8 451 self.net_io.write += size
6744346a 452 else:
f67842f8 453 self.unk_io.write += size
3b1dda96 454
d3b1efd7
AB
455 def _get_current_fd(self, fd):
456 fd_stats = self.fds[fd][-1]
457 if fd_stats.close_ts is not None:
458 return None
459
460 return fd_stats
461
462 @staticmethod
463 def _get_fd_by_timestamp(fd_list, timestamp):
464 """Return the FDStats object whose lifetime contains timestamp
465
466 This method performs a recursive binary search on the given
467 fd_list argument, and will find the FDStats object for which
e0e5f1fd 468 the timestamp is contained between its open_ts and close_ts
d3b1efd7
AB
469 attributes.
470
471 Args:
472 fd_list (list): list of FDStats object, sorted
473 chronologically by open_ts
474
475 timestamp (int): timestamp in nanoseconds (ns) since unix
476 epoch which should be contained in the FD's lifetime.
477
478 Returns:
479 The FDStats object whose lifetime contains the given
480 timestamp, None if no such object exists.
481 """
482 list_size = len(fd_list)
483 if list_size == 0:
484 return None
485
486 midpoint = list_size // 2
487 fd_stats = fd_list[midpoint]
488
489 # Handle case of currently open fd (i.e. no close_ts)
490 if fd_stats.close_ts is None:
491 if timestamp >= fd_stats.open_ts:
492 return fd_stats
493 else:
494 if fd_stats.open_ts <= timestamp <= fd_stats.close_ts:
495 return fd_stats
496 else:
497 if timestamp < fd_stats.open_ts:
498 return ProcessIOStats._get_fd_by_timestamp(
09618e0d 499 fd_list[:midpoint], timestamp)
d3b1efd7
AB
500 else:
501 return ProcessIOStats._get_fd_by_timestamp(
09618e0d 502 fd_list[midpoint + 1:], timestamp)
d3b1efd7
AB
503
504 def get_fd(self, fd, timestamp=None):
09618e0d 505 if fd not in self.fds or not self.fds[fd]:
d3b1efd7
AB
506 return None
507
508 if timestamp is None:
509 fd_stats = self._get_current_fd(fd)
510 else:
511 fd_stats = ProcessIOStats._get_fd_by_timestamp(self.fds[fd],
512 timestamp)
513
514 return fd_stats
515
3b1dda96 516 def reset(self):
f67842f8
AB
517 self.disk_io.reset()
518 self.net_io.reset()
519 self.unk_io.reset()
520 self.block_io.reset()
3b1dda96 521 self.rq_list = []
6744346a 522
a88d0468 523 for fd in self.fds:
d3b1efd7
AB
524 fd_stats = self.get_fd(fd)
525 if fd_stats is not None:
526 fd_stats.reset()
a88d0468 527
6744346a
AB
528
529class FDStats():
d3b1efd7 530 def __init__(self, fd, filename, fd_type, cloexec, family, open_ts):
6744346a
AB
531 self.fd = fd
532 self.filename = filename
533 self.fd_type = fd_type
534 self.cloexec = cloexec
535 self.family = family
d3b1efd7
AB
536 self.open_ts = open_ts
537 self.close_ts = None
f67842f8 538 self.io = stats.IO()
6744346a
AB
539 # IO Requests that acted upon the FD
540 self.rq_list = []
541
542 @classmethod
d3b1efd7
AB
543 def new_from_fd(cls, fd, open_ts):
544 return cls(fd.fd, fd.filename, fd.fd_type, fd.cloexec, fd.family,
545 open_ts)
6744346a
AB
546
547 def update_stats(self, req):
548 if req.operation is sv.IORequest.OP_READ:
f67842f8 549 self.io.read += req.returned_size
6744346a 550 elif req.operation is sv.IORequest.OP_WRITE:
f67842f8 551 self.io.write += req.returned_size
6744346a
AB
552 elif req.operation is sv.IORequest.OP_READ_WRITE:
553 if self.fd == req.fd_in:
f67842f8 554 self.io.read += req.returned_size
6744346a 555 elif self.fd == req.fd_out:
f67842f8 556 self.io.write += req.returned_size
6744346a
AB
557
558 self.rq_list.append(req)
559
560 def reset(self):
f67842f8 561 self.io.reset()
6744346a 562 self.rq_list = []
e0e5f1fd 563
320e15ef 564
e0e5f1fd
AB
565class FileStats():
566 GENERIC_NAMES = ['pipe', 'socket', 'anon_inode', 'unknown']
567
f67842f8 568 def __init__(self, filename):
e0e5f1fd 569 self.filename = filename
f67842f8 570 self.io = stats.IO()
e0e5f1fd
AB
571 # Dict of file descriptors representing this file, indexed by
572 # parent pid
f67842f8
AB
573 # FIXME this doesn't cover FD reuse cases
574 self.fd_by_pid = {}
e0e5f1fd
AB
575
576 def update_stats(self, fd_stats, proc_stats):
f67842f8 577 self.io += fd_stats.io
5f3ca36d
AB
578
579 if proc_stats.pid is not None:
580 pid = proc_stats.pid
581 else:
582 pid = proc_stats.tid
583
584 if pid not in self.fd_by_pid:
f1f3af50 585 self.fd_by_pid[pid] = fd_stats.fd
e0e5f1fd
AB
586
587 def reset(self):
f67842f8 588 self.io.reset()
e0e5f1fd
AB
589
590 @staticmethod
591 def is_generic_name(filename):
592 for generic_name in FileStats.GENERIC_NAMES:
593 if filename.startswith(generic_name):
594 return True
595
596 return False
This page took 0.050914 seconds and 5 git commands to generate.