3 # The MIT License (MIT)
5 # Copyright (C) 2015 - Antoine Busque <abusque@efficios.com>
7 # Permission is hereby granted, free of charge, to any person obtaining a copy
8 # of this software and associated documentation files (the "Software"), to deal
9 # in the Software without restriction, including without limitation the rights
10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 # copies of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
14 # The above copyright notice and this permission notice shall be included in
15 # all copies or substantial portions of the Software.
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 from .analysis
import Analysis
26 from linuxautomaton
import sv
29 class IoAnalysis(Analysis
):
30 def __init__(self
, state
):
32 'net_dev_xmit': self
._process
_net
_dev
_xmit
,
33 'netif_receive_skb': self
._process
_netif
_receive
_skb
,
34 'block_rq_complete': self
._process
_block
_rq
_complete
,
35 'io_rq_exit': self
._process
_io
_rq
_exit
,
36 'create_fd': self
._process
_create
_fd
,
37 'close_fd': self
._process
_close
_fd
,
38 'create_parent_proc': self
._process
_create
_parent
_proc
42 'lttng_statedump_block_device':
43 self
._process
_lttng
_statedump
_block
_device
47 self
._state
.register_notification_cbs(notification_cbs
)
48 self
._register
_cbs
(event_cbs
)
54 def process_event(self
, ev
):
55 self
._process
_event
_cb
(ev
)
58 for dev
in self
.disks
:
59 self
.disks
[dev
].reset()
61 for name
in self
.ifaces
:
62 self
.ifaces
[name
].reset()
65 self
.tids
[tid
].reset()
68 def disk_io_requests(self
):
69 for disk
in self
.disks
.values():
70 for io_rq
in disk
.rq_list
:
74 def io_requests(self
):
75 return self
._get
_io
_requests
()
78 def open_io_requests(self
):
79 return self
._get
_io
_requests
(sv
.IORequest
.OP_OPEN
)
82 def read_io_requests(self
):
83 return self
._get
_io
_requests
(sv
.IORequest
.OP_READ
)
86 def write_io_requests(self
):
87 return self
._get
_io
_requests
(sv
.IORequest
.OP_WRITE
)
90 def close_io_requests(self
):
91 return self
._get
_io
_requests
(sv
.IORequest
.OP_CLOSE
)
94 def sync_io_requests(self
):
95 return self
._get
_io
_requests
(sv
.IORequest
.OP_SYNC
)
98 def read_write_io_requests(self
):
99 return self
._get
_io
_requests
(sv
.IORequest
.OP_READ_WRITE
)
101 def _get_io_requests(self
, io_operation
=None):
102 """Create a generator of syscall io requests by operation
105 io_operation (IORequest.OP_*, optional): The operation of
106 the io_requests to return. Return all IO requests if None.
108 for proc
in self
.tids
.values():
109 for io_rq
in proc
.rq_list
:
110 if isinstance(io_rq
, sv
.BlockIORequest
):
113 if io_operation
is None or \
114 sv
.IORequest
.is_equivalent_operation(io_operation
,
118 def get_files_stats(self
, pid_filter_list
, comm_filter_list
):
121 for proc_stats
in self
.tids
.values():
122 if pid_filter_list
is not None and \
123 proc_stats
.pid
not in pid_filter_list
or \
124 comm_filter_list
is not None and \
125 proc_stats
.comm
not in comm_filter_list
:
128 for fd_list
in proc_stats
.fds
.values():
129 for fd_stats
in fd_list
:
130 filename
= fd_stats
.filename
131 # Add process name to generic filenames to
133 if FileStats
.is_generic_name(filename
):
134 filename
+= '(%s)' % proc_stats
.comm
136 if filename
not in files_stats
:
137 if proc_stats
.pid
is not None:
142 files_stats
[filename
] = FileStats(
143 filename
, fd_stats
.fd
, pid
)
145 files_stats
[filename
].update_stats(fd_stats
, proc_stats
)
150 def _assign_fds_to_parent(proc
, parent
):
154 if fd
not in parent
.fds
:
155 parent
.fds
[fd
] = proc
.fds
[fd
]
157 # best effort to fix the filename
158 if not parent
.get_fd(fd
).filename
:
159 parent
.get_fd(fd
).filename
= proc
.get_fd(fd
).filename
164 def _process_net_dev_xmit(self
, **kwargs
):
165 name
= kwargs
['iface_name']
166 sent_bytes
= kwargs
['sent_bytes']
168 if name
not in self
.ifaces
:
169 self
.ifaces
[name
] = IfaceStats(name
)
171 self
.ifaces
[name
].sent_packets
+= 1
172 self
.ifaces
[name
].sent_bytes
+= sent_bytes
174 def _process_netif_receive_skb(self
, **kwargs
):
175 name
= kwargs
['iface_name']
176 recv_bytes
= kwargs
['recv_bytes']
178 if name
not in self
.ifaces
:
179 self
.ifaces
[name
] = IfaceStats(name
)
181 self
.ifaces
[name
].recv_packets
+= 1
182 self
.ifaces
[name
].recv_bytes
+= recv_bytes
184 def _process_block_rq_complete(self
, **kwargs
):
186 proc
= kwargs
['proc']
188 if req
.dev
not in self
.disks
:
189 self
.disks
[req
.dev
] = DiskStats(req
.dev
)
191 self
.disks
[req
.dev
].update_stats(req
)
193 if proc
.tid
not in self
.tids
:
194 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
196 self
.tids
[proc
.tid
].update_block_stats(req
)
198 def _process_lttng_statedump_block_device(self
, event
):
200 disk_name
= event
['diskname']
202 if dev
not in self
.disks
:
203 self
.disks
[dev
] = DiskStats(dev
, disk_name
)
205 self
.disks
[dev
].disk_name
= disk_name
207 def _process_io_rq_exit(self
, **kwargs
):
208 proc
= kwargs
['proc']
209 parent_proc
= kwargs
['parent_proc']
210 io_rq
= kwargs
['io_rq']
212 if proc
.tid
not in self
.tids
:
213 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
215 if parent_proc
.tid
not in self
.tids
:
216 self
.tids
[parent_proc
.tid
] = (
217 ProcessIOStats
.new_from_process(parent_proc
))
219 proc_stats
= self
.tids
[proc
.tid
]
220 parent_stats
= self
.tids
[parent_proc
.tid
]
223 if io_rq
.errno
is None:
224 if io_rq
.operation
== sv
.IORequest
.OP_READ
or \
225 io_rq
.operation
== sv
.IORequest
.OP_WRITE
:
226 fd_types
['fd'] = parent_stats
.get_fd(io_rq
.fd
).fd_type
227 elif io_rq
.operation
== sv
.IORequest
.OP_READ_WRITE
:
228 fd_types
['fd_in'] = parent_stats
.get_fd(io_rq
.fd_in
).fd_type
229 fd_types
['fd_out'] = parent_stats
.get_fd(io_rq
.fd_out
).fd_type
231 proc_stats
.update_io_stats(io_rq
, fd_types
)
232 parent_stats
.update_fd_stats(io_rq
)
234 def _process_create_parent_proc(self
, **kwargs
):
235 proc
= kwargs
['proc']
236 parent_proc
= kwargs
['parent_proc']
238 if proc
.tid
not in self
.tids
:
239 self
.tids
[proc
.tid
] = ProcessIOStats
.new_from_process(proc
)
241 if parent_proc
.tid
not in self
.tids
:
242 self
.tids
[parent_proc
.tid
] = (
243 ProcessIOStats
.new_from_process(parent_proc
))
245 proc_stats
= self
.tids
[proc
.tid
]
246 parent_stats
= self
.tids
[parent_proc
.tid
]
248 proc_stats
.pid
= parent_stats
.tid
249 IoAnalysis
._assign
_fds
_to
_parent
(proc_stats
, parent_stats
)
251 def _process_create_fd(self
, **kwargs
):
252 timestamp
= kwargs
['timestamp']
253 parent_proc
= kwargs
['parent_proc']
254 tid
= parent_proc
.tid
257 if tid
not in self
.tids
:
258 self
.tids
[tid
] = ProcessIOStats
.new_from_process(parent_proc
)
260 parent_stats
= self
.tids
[tid
]
261 if fd
not in parent_stats
.fds
:
262 parent_stats
.fds
[fd
] = []
263 parent_stats
.fds
[fd
].append(FDStats
.new_from_fd(parent_proc
.fds
[fd
],
266 def _process_close_fd(self
, **kwargs
):
267 timestamp
= kwargs
['timestamp']
268 parent_proc
= kwargs
['parent_proc']
269 tid
= parent_proc
.tid
272 parent_stats
= self
.tids
[tid
]
273 last_fd
= parent_stats
.get_fd(fd
)
274 last_fd
.close_ts
= timestamp
279 MINORMASK
= ((1 << MINORBITS
) - 1)
281 def __init__(self
, dev
, disk_name
=None):
283 if disk_name
is not None:
284 self
.disk_name
= disk_name
286 self
.disk_name
= DiskStats
._get
_name
_from
_dev
(dev
)
288 self
.min_rq_duration
= None
289 self
.max_rq_duration
= None
290 self
.total_rq_sectors
= 0
291 self
.total_rq_duration
= 0
296 return len(self
.rq_list
)
298 def update_stats(self
, req
):
299 if self
.min_rq_duration
is None or req
.duration
< self
.min_rq_duration
:
300 self
.min_rq_duration
= req
.duration
301 if self
.max_rq_duration
is None or req
.duration
> self
.max_rq_duration
:
302 self
.max_rq_duration
= req
.duration
304 self
.total_rq_sectors
+= req
.nr_sector
305 self
.total_rq_duration
+= req
.duration
306 self
.rq_list
.append(req
)
309 self
.min_rq_duration
= None
310 self
.max_rq_duration
= None
311 self
.total_rq_sectors
= 0
312 self
.total_rq_duration
= 0
316 def _get_name_from_dev(dev
):
317 # imported from include/linux/kdev_t.h
318 major
= dev
>> DiskStats
.MINORBITS
319 minor
= dev
& DiskStats
.MINORMASK
321 return '(%d,%d)' % (major
, minor
)
325 def __init__(self
, name
):
328 self
.recv_packets
= 0
330 self
.sent_packets
= 0
334 self
.recv_packets
= 0
336 self
.sent_packets
= 0
339 class ProcessIOStats():
340 def __init__(self
, pid
, tid
, comm
):
344 # Number of bytes read or written by the process, by type of I/O
351 # Actual number of bytes read or written by the process at the
355 # FDStats objects, indexed by fd (fileno)
360 def new_from_process(cls
, proc
):
361 return cls(proc
.pid
, proc
.tid
, proc
.comm
)
363 # Total read/write does not account for block layer I/O
365 def total_read(self
):
366 return self
.disk_read
+ self
.net_read
+ self
.unk_read
369 def total_write(self
):
370 return self
.disk_write
+ self
.net_write
+ self
.unk_write
372 def update_fd_stats(self
, req
):
373 if req
.errno
is not None:
376 if req
.fd
is not None:
377 self
.get_fd(req
.fd
).update_stats(req
)
378 elif isinstance(req
, sv
.ReadWriteIORequest
):
379 if req
.fd_in
is not None:
380 self
.get_fd(req
.fd_in
).update_stats(req
)
382 if req
.fd_out
is not None:
383 self
.get_fd(req
.fd_out
).update_stats(req
)
385 def update_block_stats(self
, req
):
386 self
.rq_list
.append(req
)
388 if req
.operation
is sv
.IORequest
.OP_READ
:
389 self
.block_read
+= req
.size
390 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
391 self
.block_write
+= req
.size
393 def update_io_stats(self
, req
, fd_types
):
394 self
.rq_list
.append(req
)
396 if req
.size
is None or req
.errno
is not None:
399 if req
.operation
is sv
.IORequest
.OP_READ
:
400 self
._update
_read
(req
.returned_size
, fd_types
['fd'])
401 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
402 self
._update
_write
(req
.returned_size
, fd_types
['fd'])
403 elif req
.operation
is sv
.IORequest
.OP_READ_WRITE
:
404 self
._update
_read
(req
.returned_size
, fd_types
['fd_in'])
405 self
._update
_write
(req
.returned_size
, fd_types
['fd_out'])
407 self
.rq_list
.append(req
)
409 def _update_read(self
, size
, fd_type
):
410 if fd_type
== sv
.FDType
.disk
:
411 self
.disk_read
+= size
412 elif fd_type
== sv
.FDType
.net
or fd_type
== sv
.FDType
.maybe_net
:
413 self
.net_read
+= size
415 self
.unk_read
+= size
417 def _update_write(self
, size
, fd_type
):
418 if fd_type
== sv
.FDType
.disk
:
419 self
.disk_write
+= size
420 elif fd_type
== sv
.FDType
.net
or fd_type
== sv
.FDType
.maybe_net
:
421 self
.net_write
+= size
423 self
.unk_write
+= size
425 def _get_current_fd(self
, fd
):
426 fd_stats
= self
.fds
[fd
][-1]
427 if fd_stats
.close_ts
is not None:
433 def _get_fd_by_timestamp(fd_list
, timestamp
):
434 """Return the FDStats object whose lifetime contains timestamp
436 This method performs a recursive binary search on the given
437 fd_list argument, and will find the FDStats object for which
438 the timestamp is contained between its open_ts and close_ts
442 fd_list (list): list of FDStats object, sorted
443 chronologically by open_ts
445 timestamp (int): timestamp in nanoseconds (ns) since unix
446 epoch which should be contained in the FD's lifetime.
449 The FDStats object whose lifetime contains the given
450 timestamp, None if no such object exists.
452 list_size
= len(fd_list
)
456 midpoint
= list_size
// 2
457 fd_stats
= fd_list
[midpoint
]
459 # Handle case of currently open fd (i.e. no close_ts)
460 if fd_stats
.close_ts
is None:
461 if timestamp
>= fd_stats
.open_ts
:
464 if fd_stats
.open_ts
<= timestamp
<= fd_stats
.close_ts
:
467 if timestamp
< fd_stats
.open_ts
:
468 return ProcessIOStats
._get
_fd
_by
_timestamp
(
469 fd_list
[:midpoint
], timestamp
)
471 return ProcessIOStats
._get
_fd
_by
_timestamp
(
472 fd_list
[midpoint
+ 1:], timestamp
)
474 def get_fd(self
, fd
, timestamp
=None):
475 if fd
not in self
.fds
or not self
.fds
[fd
]:
478 if timestamp
is None:
479 fd_stats
= self
._get
_current
_fd
(fd
)
481 fd_stats
= ProcessIOStats
._get
_fd
_by
_timestamp
(self
.fds
[fd
],
498 fd_stats
= self
.get_fd(fd
)
499 if fd_stats
is not None:
504 def __init__(self
, fd
, filename
, fd_type
, cloexec
, family
, open_ts
):
506 self
.filename
= filename
507 self
.fd_type
= fd_type
508 self
.cloexec
= cloexec
510 self
.open_ts
= open_ts
513 # Number of bytes read or written
516 # IO Requests that acted upon the FD
520 def new_from_fd(cls
, fd
, open_ts
):
521 return cls(fd
.fd
, fd
.filename
, fd
.fd_type
, fd
.cloexec
, fd
.family
,
524 def update_stats(self
, req
):
525 if req
.operation
is sv
.IORequest
.OP_READ
:
526 self
.read
+= req
.returned_size
527 elif req
.operation
is sv
.IORequest
.OP_WRITE
:
528 self
.write
+= req
.returned_size
529 elif req
.operation
is sv
.IORequest
.OP_READ_WRITE
:
530 if self
.fd
== req
.fd_in
:
531 self
.read
+= req
.returned_size
532 elif self
.fd
== req
.fd_out
:
533 self
.write
+= req
.returned_size
535 self
.rq_list
.append(req
)
543 GENERIC_NAMES
= ['pipe', 'socket', 'anon_inode', 'unknown']
545 def __init__(self
, filename
, fd
, pid
):
546 self
.filename
= filename
547 # Number of bytes read or written
550 # Dict of file descriptors representing this file, indexed by
552 self
.fd_by_pid
= {pid
: fd
}
554 def update_stats(self
, fd_stats
, proc_stats
):
555 self
.read
+= fd_stats
.read
556 self
.write
+= fd_stats
.write
558 if proc_stats
.pid
is not None:
563 if pid
not in self
.fd_by_pid
:
564 self
.fd_by_pid
[pid
] = fd_stats
.fd
571 def is_generic_name(filename
):
572 for generic_name
in FileStats
.GENERIC_NAMES
:
573 if filename
.startswith(generic_name
):
This page took 0.045126 seconds and 5 git commands to generate.