1 # The MIT License (MIT)
3 # Copyright (C) 2015 - Antoine Busque <abusque@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 from .analysis
import Analysis
25 from ..linuxautomaton
import sv
28 class IoAnalysis(Analysis
):
29 def __init__(self
, state
, conf
):
31 'net_dev_xmit': self
._process
_net
_dev
_xmit
,
32 'netif_receive_skb': self
._process
_netif
_receive
_skb
,
33 'block_rq_complete': self
._process
_block
_rq
_complete
,
34 'io_rq_exit': self
._process
_io
_rq
_exit
,
35 'create_fd': self
._process
_create
_fd
,
36 'close_fd': self
._process
_close
_fd
,
37 'update_fd': self
._process
_update
_fd
,
38 'create_parent_proc': self
._process
_create
_parent
_proc
42 'lttng_statedump_block_device':
43 self
._process
_lttng
_statedump
_block
_device
46 super().__init
__(state
, conf
)
47 self
._state
.register_notification_cbs(notification_cbs
)
48 self
._register
_cbs
(event_cbs
)
54 def process_event(self
, ev
):
55 super().process_event(ev
)
56 self
._process
_event
_cb
(ev
)
59 for dev
in self
.disks
:
60 self
.disks
[dev
].reset()
62 for name
in self
.ifaces
:
63 self
.ifaces
[name
].reset()
66 self
.tids
[tid
].reset()
69 def disk_io_requests(self
):
70 for disk
in self
.disks
.values():
71 for io_rq
in disk
.rq_list
:
75 def io_requests(self
):
76 return self
._get
_io
_requests
()
79 def open_io_requests(self
):
80 return self
._get
_io
_requests
(sv
.IORequest
.OP_OPEN
)
83 def read_io_requests(self
):
84 return self
._get
_io
_requests
(sv
.IORequest
.OP_READ
)
87 def write_io_requests(self
):
88 return self
._get
_io
_requests
(sv
.IORequest
.OP_WRITE
)
91 def close_io_requests(self
):
92 return self
._get
_io
_requests
(sv
.IORequest
.OP_CLOSE
)
95 def sync_io_requests(self
):
96 return self
._get
_io
_requests
(sv
.IORequest
.OP_SYNC
)
99 def read_write_io_requests(self
):
100 return self
._get
_io
_requests
(sv
.IORequest
.OP_READ_WRITE
)
102 def _get_io_requests(self
, io_operation
=None):
103 """Create a generator of syscall io requests by operation
106 io_operation (IORequest.OP_*, optional): The operation of
107 the io_requests to return. Return all IO requests if None.
109 for proc
in self
.tids
.values():
110 for io_rq
in proc
.rq_list
:
111 if isinstance(io_rq
, sv
.BlockIORequest
):
114 if io_operation
is None or \
115 sv
.IORequest
.is_equivalent_operation(io_operation
,
119 def get_files_stats(self
):
122 for proc_stats
in self
.tids
.values():
123 for fd_list
in proc_stats
.fds
.values():
124 for fd_stats
in fd_list
:
125 filename
= fd_stats
.filename
126 # Add process name to generic filenames to
128 if FileStats
.is_generic_name(filename
):
129 filename
+= '(%s)' % proc_stats
.comm
131 if filename
not in files_stats
:
132 files_stats
[filename
] = FileStats(filename
)
134 files_stats
[filename
].update_stats(fd_stats
, proc_stats
)
139 def _assign_fds_to_parent(proc
, parent
):
143 if fd
not in parent
.fds
:
144 parent
.fds
[fd
] = proc
.fds
[fd
]
146 # best effort to fix the filename
147 if not parent
.get_fd(fd
).filename
:
148 parent
.get_fd(fd
).filename
= proc
.get_fd(fd
).filename
153 def _process_net_dev_xmit(self
, **kwargs
):
154 name
= kwargs
['iface_name']
155 sent_bytes
= kwargs
['sent_bytes']
156 cpu
= kwargs
['cpu_id']
158 if not self
._filter
_cpu
(cpu
):
161 if name
not in self
.ifaces
:
162 self
.ifaces
[name
] = IfaceStats(name
)
164 self
.ifaces
[name
].sent_packets
+= 1
165 self
.ifaces
[name
].sent_bytes
+= sent_bytes
167 def _process_netif_receive_skb(self
, **kwargs
):
168 name
= kwargs
['iface_name']
169 recv_bytes
= kwargs
['recv_bytes']
170 cpu
= kwargs
['cpu_id']
172 if not self
._filter
_cpu
(cpu
):
175 if name
not in self
.ifaces
:
176 self
.ifaces
[name
] = IfaceStats(name
)
178 self
.ifaces
[name
].recv_packets
+= 1
179 self
.ifaces
[name
].recv_bytes
+= recv_bytes
181 def _process_block_rq_complete(self
, **kwargs
):
183 proc
= kwargs
['proc']
184 cpu
= kwargs
['cpu_id']
186 if not self
._filter
_process
(proc
):
188 if not self
._filter
_cpu
(cpu
):
191 if req
.dev
not in self
.disks
:
192 self
.disks
[req
.dev
] = DiskStats(req
.dev
)
194 self
.disks
[req
.dev
].update_stats(req
)
197 if proc
.tid
not in self
.tids
:
198 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
200 self
.tids
[proc
.tid
].update_block_stats(req
)
202 def _process_lttng_statedump_block_device(self
, event
):
204 disk_name
= event
['diskname']
206 if dev
not in self
.disks
:
207 self
.disks
[dev
] = DiskStats(dev
, disk_name
)
209 self
.disks
[dev
].disk_name
= disk_name
211 def _process_io_rq_exit(self
, **kwargs
):
212 proc
= kwargs
['proc']
213 parent_proc
= kwargs
['parent_proc']
214 io_rq
= kwargs
['io_rq']
215 cpu
= kwargs
['cpu_id']
217 if not self
._filter
_process
(parent_proc
):
219 if not self
._filter
_cpu
(cpu
):
222 if proc
.tid
not in self
.tids
:
223 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
225 if parent_proc
.tid
not in self
.tids
:
226 self
.tids
[parent_proc
.tid
] = (
227 ProcessIOStats
.new_from_process(parent_proc
))
229 proc_stats
= self
.tids
[proc
.tid
]
230 parent_stats
= self
.tids
[parent_proc
.tid
]
233 if io_rq
.errno
is None:
234 if io_rq
.operation
== sv
.IORequest
.OP_READ
or \
235 io_rq
.operation
== sv
.IORequest
.OP_WRITE
:
236 fd_types
['fd'] = parent_stats
.get_fd(io_rq
.fd
).fd_type
237 elif io_rq
.operation
== sv
.IORequest
.OP_READ_WRITE
:
238 fd_types
['fd_in'] = parent_stats
.get_fd(io_rq
.fd_in
).fd_type
239 fd_types
['fd_out'] = parent_stats
.get_fd(io_rq
.fd_out
).fd_type
241 proc_stats
.update_io_stats(io_rq
, fd_types
)
242 parent_stats
.update_fd_stats(io_rq
)
244 # Check if the proc stats comm corresponds to the actual
245 # process comm. It might be that it was missing so far.
246 if proc_stats
.comm
!= proc
.comm
:
247 proc_stats
.comm
= proc
.comm
248 if parent_stats
.comm
!= parent_proc
.comm
:
249 parent_stats
.comm
= parent_proc
.comm
251 def _process_create_parent_proc(self
, **kwargs
):
252 proc
= kwargs
['proc']
253 parent_proc
= kwargs
['parent_proc']
255 if not self
._filter
_process
(parent_proc
):
258 if proc
.tid
not in self
.tids
:
259 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
261 if parent_proc
.tid
not in self
.tids
:
262 self
.tids
[parent_proc
.tid
] = (
263 ProcessIOStats
.new_from_process(parent_proc
))
265 proc_stats
= self
.tids
[proc
.tid
]
266 parent_stats
= self
.tids
[parent_proc
.tid
]
268 proc_stats
.pid
= parent_stats
.tid
269 IoAnalysis
._assign
_fds
_to
_parent
(proc_stats
, parent_stats
)
271 def _process_create_fd(self
, **kwargs
):
272 timestamp
= kwargs
['timestamp']
273 parent_proc
= kwargs
['parent_proc']
274 tid
= parent_proc
.tid
275 cpu
= kwargs
['cpu_id']
278 if not self
._filter
_process
(parent_proc
):
280 if not self
._filter
_cpu
(cpu
):
283 if tid
not in self
.tids
:
284 self
.tids
[tid
] = ProcessIOStats
.new_from_process(parent_proc
)
286 parent_stats
= self
.tids
[tid
]
287 if fd
not in parent_stats
.fds
:
288 parent_stats
.fds
[fd
] = []
289 parent_stats
.fds
[fd
].append(FDStats
.new_from_fd(parent_proc
.fds
[fd
],
292 def _process_close_fd(self
, **kwargs
):
293 timestamp
= kwargs
['timestamp']
294 parent_proc
= kwargs
['parent_proc']
295 tid
= parent_proc
.tid
296 cpu
= kwargs
['cpu_id']
299 if not self
._filter
_process
(parent_proc
):
301 if not self
._filter
_cpu
(cpu
):
304 parent_stats
= self
.tids
[tid
]
305 last_fd
= parent_stats
.get_fd(fd
)
306 last_fd
.close_ts
= timestamp
308 def _process_update_fd(self
, **kwargs
):
309 parent_proc
= kwargs
['parent_proc']
310 tid
= parent_proc
.tid
313 new_filename
= parent_proc
.fds
[fd
].filename
314 fd_list
= self
.tids
[tid
].fds
[fd
]
315 fd_list
[-1].filename
= new_filename
320 MINORMASK
= ((1 << MINORBITS
) - 1)
322 def __init__(self
, dev
, disk_name
=None):
324 if disk_name
is not None:
325 self
.disk_name
= disk_name
327 self
.disk_name
= DiskStats
._get
_name
_from
_dev
(dev
)
329 self
.min_rq_duration
= None
330 self
.max_rq_duration
= None
331 self
.total_rq_sectors
= 0
332 self
.total_rq_duration
= 0
337 return len(self
.rq_list
)
339 def update_stats(self
, req
):
340 if self
.min_rq_duration
is None or req
.duration
< self
.min_rq_duration
:
341 self
.min_rq_duration
= req
.duration
342 if self
.max_rq_duration
is None or req
.duration
> self
.max_rq_duration
:
343 self
.max_rq_duration
= req
.duration
345 self
.total_rq_sectors
+= req
.nr_sector
346 self
.total_rq_duration
+= req
.duration
347 self
.rq_list
.append(req
)
350 self
.min_rq_duration
= None
351 self
.max_rq_duration
= None
352 self
.total_rq_sectors
= 0
353 self
.total_rq_duration
= 0
357 def _get_name_from_dev(dev
):
358 # imported from include/linux/kdev_t.h
359 major
= dev
>> DiskStats
.MINORBITS
360 minor
= dev
& DiskStats
.MINORMASK
362 return '(%d,%d)' % (major
, minor
)
366 def __init__(self
, name
):
369 self
.recv_packets
= 0
371 self
.sent_packets
= 0
375 self
.recv_packets
= 0
377 self
.sent_packets
= 0
380 class ProcessIOStats(stats
.Process
):
381 def __init__(self
, pid
, tid
, comm
):
382 super().__init
__(pid
, tid
, comm
)
383 self
.disk_io
= stats
.IO()
384 self
.net_io
= stats
.IO()
385 self
.unk_io
= stats
.IO()
386 self
.block_io
= stats
.IO()
387 # FDStats objects, indexed by fd (fileno)
392 def new_from_process(cls
, proc
):
393 return cls(proc
.pid
, proc
.tid
, proc
.comm
)
395 # Total read/write does not account for block layer I/O
397 def total_read(self
):
398 return self
.disk_io
.read
+ self
.net_io
.read
+ self
.unk_io
.read
401 def total_write(self
):
402 return self
.disk_io
.write
+ self
.net_io
.write
+ self
.unk_io
.write
404 def update_fd_stats(self
, req
):
405 if req
.errno
is not None:
408 if req
.fd
is not None:
409 self
.get_fd(req
.fd
).update_stats(req
)
410 elif isinstance(req
, sv
.ReadWriteIORequest
):
411 if req
.fd_in
is not None:
412 self
.get_fd(req
.fd_in
).update_stats(req
)
414 if req
.fd_out
is not None:
415 self
.get_fd(req
.fd_out
).update_stats(req
)
417 def update_block_stats(self
, req
):
418 self
.rq_list
.append(req
)
420 if req
.operation
is sv
.IORequest
.OP_READ
:
421 self
.block_io
.read
+= req
.size
422 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
423 self
.block_io
.write
+= req
.size
425 def update_io_stats(self
, req
, fd_types
):
426 self
.rq_list
.append(req
)
428 if req
.size
is None or req
.errno
is not None:
431 if req
.operation
is sv
.IORequest
.OP_READ
:
432 self
._update
_read
(req
.returned_size
, fd_types
['fd'])
433 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
434 self
._update
_write
(req
.returned_size
, fd_types
['fd'])
435 elif req
.operation
is sv
.IORequest
.OP_READ_WRITE
:
436 self
._update
_read
(req
.returned_size
, fd_types
['fd_in'])
437 self
._update
_write
(req
.returned_size
, fd_types
['fd_out'])
439 def _update_read(self
, size
, fd_type
):
440 if fd_type
== sv
.FDType
.disk
:
441 self
.disk_io
.read
+= size
442 elif fd_type
== sv
.FDType
.net
or fd_type
== sv
.FDType
.maybe_net
:
443 self
.net_io
.read
+= size
445 self
.unk_io
.read
+= size
447 def _update_write(self
, size
, fd_type
):
448 if fd_type
== sv
.FDType
.disk
:
449 self
.disk_io
.write
+= size
450 elif fd_type
== sv
.FDType
.net
or fd_type
== sv
.FDType
.maybe_net
:
451 self
.net_io
.write
+= size
453 self
.unk_io
.write
+= size
455 def _get_current_fd(self
, fd
):
456 fd_stats
= self
.fds
[fd
][-1]
457 if fd_stats
.close_ts
is not None:
463 def _get_fd_by_timestamp(fd_list
, timestamp
):
464 """Return the FDStats object whose lifetime contains timestamp
466 This method performs a recursive binary search on the given
467 fd_list argument, and will find the FDStats object for which
468 the timestamp is contained between its open_ts and close_ts
472 fd_list (list): list of FDStats object, sorted
473 chronologically by open_ts
475 timestamp (int): timestamp in nanoseconds (ns) since unix
476 epoch which should be contained in the FD's lifetime.
479 The FDStats object whose lifetime contains the given
480 timestamp, None if no such object exists.
482 list_size
= len(fd_list
)
486 midpoint
= list_size
// 2
487 fd_stats
= fd_list
[midpoint
]
489 # Handle case of currently open fd (i.e. no close_ts)
490 if fd_stats
.close_ts
is None:
491 if timestamp
>= fd_stats
.open_ts
:
494 if fd_stats
.open_ts
<= timestamp
<= fd_stats
.close_ts
:
497 if timestamp
< fd_stats
.open_ts
:
498 return ProcessIOStats
._get
_fd
_by
_timestamp
(
499 fd_list
[:midpoint
], timestamp
)
501 return ProcessIOStats
._get
_fd
_by
_timestamp
(
502 fd_list
[midpoint
+ 1:], timestamp
)
504 def get_fd(self
, fd
, timestamp
=None):
505 if fd
not in self
.fds
or not self
.fds
[fd
]:
508 if timestamp
is None:
509 fd_stats
= self
._get
_current
_fd
(fd
)
511 fd_stats
= ProcessIOStats
._get
_fd
_by
_timestamp
(self
.fds
[fd
],
520 self
.block_io
.reset()
524 fd_stats
= self
.get_fd(fd
)
525 if fd_stats
is not None:
530 def __init__(self
, fd
, filename
, fd_type
, cloexec
, family
, open_ts
):
532 self
.filename
= filename
533 self
.fd_type
= fd_type
534 self
.cloexec
= cloexec
536 self
.open_ts
= open_ts
539 # IO Requests that acted upon the FD
543 def new_from_fd(cls
, fd
, open_ts
):
544 return cls(fd
.fd
, fd
.filename
, fd
.fd_type
, fd
.cloexec
, fd
.family
,
547 def update_stats(self
, req
):
548 if req
.operation
is sv
.IORequest
.OP_READ
:
549 self
.io
.read
+= req
.returned_size
550 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
551 self
.io
.write
+= req
.returned_size
552 elif req
.operation
is sv
.IORequest
.OP_READ_WRITE
:
553 if self
.fd
== req
.fd_in
:
554 self
.io
.read
+= req
.returned_size
555 elif self
.fd
== req
.fd_out
:
556 self
.io
.write
+= req
.returned_size
558 self
.rq_list
.append(req
)
566 GENERIC_NAMES
= ['pipe', 'socket', 'anon_inode', 'unknown']
568 def __init__(self
, filename
):
569 self
.filename
= filename
571 # Dict of file descriptors representing this file, indexed by
573 # FIXME this doesn't cover FD reuse cases
576 def update_stats(self
, fd_stats
, proc_stats
):
577 self
.io
+= fd_stats
.io
579 if proc_stats
.pid
is not None:
584 if pid
not in self
.fd_by_pid
:
585 self
.fd_by_pid
[pid
] = fd_stats
.fd
591 def is_generic_name(filename
):
592 for generic_name
in FileStats
.GENERIC_NAMES
:
593 if filename
.startswith(generic_name
):
This page took 0.061128 seconds and 5 git commands to generate.