Add configuration for python code formatter black
[babeltrace.git] / tests / bindings / python / bt2 / test_port.py
1 from bt2 import value
2 import unittest
3 import copy
4 import bt2
5
6
7 @unittest.skip("this is broken")
8 class PortTestCase(unittest.TestCase):
9 @staticmethod
10 def _create_comp(comp_cls, name=None):
11 graph = bt2.Graph()
12
13 if name is None:
14 name = 'comp'
15
16 return graph.add_component(comp_cls, name)
17
18 def test_src_add_output_port(self):
19 class MyIter(bt2._UserMessageIterator):
20 def __next__(self):
21 raise bt2.Stop
22
23 class MySource(bt2._UserSourceComponent,
24 message_iterator_class=MyIter):
25 def __init__(comp_self, params):
26 port = comp_self._add_output_port('out')
27 self.assertEqual(port.name, 'out')
28
29 comp = self._create_comp(MySource)
30 self.assertEqual(len(comp.output_ports), 1)
31
32
33 def test_flt_add_output_port(self):
34 class MyIter(bt2._UserMessageIterator):
35 def __next__(self):
36 raise bt2.Stop
37
38 class MyFilter(bt2._UserFilterComponent,
39 message_iterator_class=MyIter):
40 def __init__(comp_self, params):
41 port = comp_self._add_output_port('out')
42 self.assertEqual(port.name, 'out')
43
44 comp = self._create_comp(MyFilter)
45 self.assertEqual(len(comp.output_ports), 1)
46
47 def test_flt_add_input_port(self):
48 class MyIter(bt2._UserMessageIterator):
49 def __next__(self):
50 raise bt2.Stop
51
52 class MyFilter(bt2._UserFilterComponent,
53 message_iterator_class=MyIter):
54 def __init__(comp_self, params):
55 port = comp_self._add_input_port('in')
56 self.assertEqual(port.name, 'in')
57
58 comp = self._create_comp(MyFilter)
59 self.assertEqual(len(comp.input_ports), 1)
60
61 def test_sink_add_input_port(self):
62 class MySink(bt2._UserSinkComponent):
63 def __init__(comp_self, params):
64 port = comp_self._add_input_port('in')
65 self.assertEqual(port.name, 'in')
66
67 def _consume(self):
68 pass
69
70 comp = self._create_comp(MySink)
71 self.assertEqual(len(comp.input_ports), 1)
72
73 def test_user_src_output_ports_getitem(self):
74 class MyIter(bt2._UserMessageIterator):
75 def __next__(self):
76 raise bt2.Stop
77
78 class MySource(bt2._UserSourceComponent,
79 message_iterator_class=MyIter):
80 def __init__(comp_self, params):
81 port1 = comp_self._add_output_port('clear')
82 port2 = comp_self._add_output_port('print')
83 port3 = comp_self._add_output_port('insert')
84 self.assertEqual(port3.addr, comp_self._output_ports['insert'].addr)
85 self.assertEqual(port2.addr, comp_self._output_ports['print'].addr)
86 self.assertEqual(port1.addr, comp_self._output_ports['clear'].addr)
87
88 comp = self._create_comp(MySource)
89
90 def test_user_flt_output_ports_getitem(self):
91 class MyIter(bt2._UserMessageIterator):
92 def __next__(self):
93 raise bt2.Stop
94
95 class MyFilter(bt2._UserFilterComponent,
96 message_iterator_class=MyIter):
97 def __init__(comp_self, params):
98 port1 = comp_self._add_output_port('clear')
99 port2 = comp_self._add_output_port('print')
100 port3 = comp_self._add_output_port('insert')
101 self.assertEqual(port3.addr, comp_self._output_ports['insert'].addr)
102 self.assertEqual(port2.addr, comp_self._output_ports['print'].addr)
103 self.assertEqual(port1.addr, comp_self._output_ports['clear'].addr)
104
105 comp = self._create_comp(MyFilter)
106
107 def test_user_flt_input_ports_getitem(self):
108 class MyIter(bt2._UserMessageIterator):
109 def __next__(self):
110 raise bt2.Stop
111
112 class MyFilter(bt2._UserFilterComponent,
113 message_iterator_class=MyIter):
114 def __init__(comp_self, params):
115 port1 = comp_self._add_input_port('clear')
116 port2 = comp_self._add_input_port('print')
117 port3 = comp_self._add_input_port('insert')
118 self.assertEqual(port3.addr, comp_self._input_ports['insert'].addr)
119 self.assertEqual(port2.addr, comp_self._input_ports['print'].addr)
120 self.assertEqual(port1.addr, comp_self._input_ports['clear'].addr)
121
122 comp = self._create_comp(MyFilter)
123
124 def test_user_sink_input_ports_getitem(self):
125 class MySink(bt2._UserSinkComponent):
126 def __init__(comp_self, params):
127 port1 = comp_self._add_input_port('clear')
128 port2 = comp_self._add_input_port('print')
129 port3 = comp_self._add_input_port('insert')
130 self.assertEqual(port3.addr, comp_self._input_ports['insert'].addr)
131 self.assertEqual(port2.addr, comp_self._input_ports['print'].addr)
132 self.assertEqual(port1.addr, comp_self._input_ports['clear'].addr)
133
134 def _consume(self):
135 pass
136
137 comp = self._create_comp(MySink)
138
139 def test_user_src_output_ports_getitem_invalid_key(self):
140 class MyIter(bt2._UserMessageIterator):
141 def __next__(self):
142 raise bt2.Stop
143
144 class MySource(bt2._UserSourceComponent,
145 message_iterator_class=MyIter):
146 def __init__(comp_self, params):
147 comp_self._add_output_port('clear')
148 comp_self._add_output_port('print')
149 comp_self._add_output_port('insert')
150
151 with self.assertRaises(KeyError):
152 comp_self._output_ports['hello']
153
154 comp = self._create_comp(MySource)
155
156 def test_user_flt_output_ports_getitem_invalid_key(self):
157 class MyIter(bt2._UserMessageIterator):
158 def __next__(self):
159 raise bt2.Stop
160
161 class MyFilter(bt2._UserFilterComponent,
162 message_iterator_class=MyIter):
163 def __init__(comp_self, params):
164 comp_self._add_output_port('clear')
165 comp_self._add_output_port('print')
166 comp_self._add_output_port('insert')
167
168 with self.assertRaises(KeyError):
169 comp_self._output_ports['hello']
170
171 comp = self._create_comp(MyFilter)
172
173 def test_user_flt_input_ports_getitem_invalid_key(self):
174 class MyIter(bt2._UserMessageIterator):
175 def __next__(self):
176 raise bt2.Stop
177
178 class MyFilter(bt2._UserFilterComponent,
179 message_iterator_class=MyIter):
180 def __init__(comp_self, params):
181 comp_self._add_input_port('clear')
182 comp_self._add_input_port('print')
183 comp_self._add_input_port('insert')
184
185 with self.assertRaises(KeyError):
186 comp_self._input_ports['hello']
187
188 comp = self._create_comp(MyFilter)
189
190 def test_user_sink_input_ports_getitem_invalid_key(self):
191 class MySink(bt2._UserSinkComponent):
192 def __init__(comp_self, params):
193 comp_self._add_input_port('clear')
194 comp_self._add_input_port('print')
195 comp_self._add_input_port('insert')
196
197 with self.assertRaises(KeyError):
198 comp_self._input_ports['hello']
199
200 def _consume(self):
201 pass
202
203 comp = self._create_comp(MySink)
204
205 def test_user_src_output_ports_len(self):
206 class MyIter(bt2._UserMessageIterator):
207 def __next__(self):
208 raise bt2.Stop
209
210 class MySource(bt2._UserSourceComponent,
211 message_iterator_class=MyIter):
212 def __init__(comp_self, params):
213 comp_self._add_output_port('clear')
214 comp_self._add_output_port('print')
215 comp_self._add_output_port('insert')
216 self.assertEqual(len(comp_self._output_ports), 3)
217
218 comp = self._create_comp(MySource)
219
220 def test_user_flt_output_ports_len(self):
221 class MyIter(bt2._UserMessageIterator):
222 def __next__(self):
223 raise bt2.Stop
224
225 class MyFilter(bt2._UserFilterComponent,
226 message_iterator_class=MyIter):
227 def __init__(comp_self, params):
228 comp_self._add_output_port('clear')
229 comp_self._add_output_port('print')
230 comp_self._add_output_port('insert')
231 self.assertEqual(len(comp_self._output_ports), 3)
232
233 comp = self._create_comp(MyFilter)
234
235 def test_user_flt_input_ports_len(self):
236 class MyIter(bt2._UserMessageIterator):
237 def __next__(self):
238 raise bt2.Stop
239
240 class MyFilter(bt2._UserFilterComponent,
241 message_iterator_class=MyIter):
242 def __init__(comp_self, params):
243 comp_self._add_input_port('clear')
244 comp_self._add_input_port('print')
245 comp_self._add_input_port('insert')
246 self.assertEqual(len(comp_self._input_ports), 3)
247
248 comp = self._create_comp(MyFilter)
249
250 def test_user_sink_input_ports_len(self):
251 class MySink(bt2._UserSinkComponent):
252 def __init__(comp_self, params):
253 comp_self._add_input_port('clear')
254 comp_self._add_input_port('print')
255 comp_self._add_input_port('insert')
256 self.assertEqual(len(comp_self._input_ports), 3)
257
258 def _consume(self):
259 pass
260
261 comp = self._create_comp(MySink)
262
263 def test_user_src_output_ports_iter(self):
264 class MyIter(bt2._UserMessageIterator):
265 def __next__(self):
266 raise bt2.Stop
267
268 class MySource(bt2._UserSourceComponent,
269 message_iterator_class=MyIter):
270 def __init__(comp_self, params):
271 port1 = comp_self._add_output_port('clear')
272 port2 = comp_self._add_output_port('print')
273 port3 = comp_self._add_output_port('insert')
274 ports = []
275
276 for port_name, port in comp_self._output_ports.items():
277 ports.append((port_name, port))
278
279 self.assertEqual(ports[0][0], 'clear')
280 self.assertEqual(ports[0][1].addr, port1.addr)
281 self.assertEqual(ports[1][0], 'print')
282 self.assertEqual(ports[1][1].addr, port2.addr)
283 self.assertEqual(ports[2][0], 'insert')
284 self.assertEqual(ports[2][1].addr, port3.addr)
285
286 comp = self._create_comp(MySource)
287
288 def test_user_flt_output_ports_iter(self):
289 class MyIter(bt2._UserMessageIterator):
290 def __next__(self):
291 raise bt2.Stop
292
293 class MyFilter(bt2._UserFilterComponent,
294 message_iterator_class=MyIter):
295 def __init__(comp_self, params):
296 port1 = comp_self._add_output_port('clear')
297 port2 = comp_self._add_output_port('print')
298 port3 = comp_self._add_output_port('insert')
299 ports = []
300
301 for port_name, port in comp_self._output_ports.items():
302 ports.append((port_name, port))
303
304 self.assertEqual(ports[0][0], 'clear')
305 self.assertEqual(ports[0][1].addr, port1.addr)
306 self.assertEqual(ports[1][0], 'print')
307 self.assertEqual(ports[1][1].addr, port2.addr)
308 self.assertEqual(ports[2][0], 'insert')
309 self.assertEqual(ports[2][1].addr, port3.addr)
310
311 comp = self._create_comp(MyFilter)
312
313 def test_user_flt_input_ports_iter(self):
314 class MyIter(bt2._UserMessageIterator):
315 def __next__(self):
316 raise bt2.Stop
317
318 class MyFilter(bt2._UserFilterComponent,
319 message_iterator_class=MyIter):
320 def __init__(comp_self, params):
321 port1 = comp_self._add_input_port('clear')
322 port2 = comp_self._add_input_port('print')
323 port3 = comp_self._add_input_port('insert')
324 ports = []
325
326 for port_name, port in comp_self._input_ports.items():
327 ports.append((port_name, port))
328
329 self.assertEqual(ports[0][0], 'clear')
330 self.assertEqual(ports[0][1].addr, port1.addr)
331 self.assertEqual(ports[1][0], 'print')
332 self.assertEqual(ports[1][1].addr, port2.addr)
333 self.assertEqual(ports[2][0], 'insert')
334 self.assertEqual(ports[2][1].addr, port3.addr)
335
336 comp = self._create_comp(MyFilter)
337
338 def test_user_sink_input_ports_iter(self):
339 class MySink(bt2._UserSinkComponent):
340 def __init__(comp_self, params):
341 port1 = comp_self._add_input_port('clear')
342 port2 = comp_self._add_input_port('print')
343 port3 = comp_self._add_input_port('insert')
344 ports = []
345
346 for port_name, port in comp_self._input_ports.items():
347 ports.append((port_name, port))
348
349 self.assertEqual(ports[0][0], 'clear')
350 self.assertEqual(ports[0][1].addr, port1.addr)
351 self.assertEqual(ports[1][0], 'print')
352 self.assertEqual(ports[1][1].addr, port2.addr)
353 self.assertEqual(ports[2][0], 'insert')
354 self.assertEqual(ports[2][1].addr, port3.addr)
355
356 def _consume(self):
357 pass
358
359 comp = self._create_comp(MySink)
360
361 def test_gen_src_output_ports_getitem(self):
362 class MyIter(bt2._UserMessageIterator):
363 def __next__(self):
364 raise bt2.Stop
365
366 port1 = None
367 port2 = None
368 port3 = None
369
370 class MySource(bt2._UserSourceComponent,
371 message_iterator_class=MyIter):
372 def __init__(comp_self, params):
373 nonlocal port1, port2, port3
374 port1 = comp_self._add_output_port('clear')
375 port2 = comp_self._add_output_port('print')
376 port3 = comp_self._add_output_port('insert')
377
378 comp = self._create_comp(MySource)
379 self.assertEqual(port3.addr, comp.output_ports['insert'].addr)
380 self.assertEqual(port2.addr, comp.output_ports['print'].addr)
381 self.assertEqual(port1.addr, comp.output_ports['clear'].addr)
382 del port1
383 del port2
384 del port3
385
386 def test_gen_flt_output_ports_getitem(self):
387 class MyIter(bt2._UserMessageIterator):
388 def __next__(self):
389 raise bt2.Stop
390
391 port1 = None
392 port2 = None
393 port3 = None
394
395 class MyFilter(bt2._UserFilterComponent,
396 message_iterator_class=MyIter):
397 def __init__(comp_self, params):
398 nonlocal port1, port2, port3
399 port1 = comp_self._add_output_port('clear')
400 port2 = comp_self._add_output_port('print')
401 port3 = comp_self._add_output_port('insert')
402
403 comp = self._create_comp(MyFilter)
404 self.assertEqual(port3.addr, comp.output_ports['insert'].addr)
405 self.assertEqual(port2.addr, comp.output_ports['print'].addr)
406 self.assertEqual(port1.addr, comp.output_ports['clear'].addr)
407 del port1
408 del port2
409 del port3
410
411 def test_gen_flt_input_ports_getitem(self):
412 class MyIter(bt2._UserMessageIterator):
413 def __next__(self):
414 raise bt2.Stop
415
416 port1 = None
417 port2 = None
418 port3 = None
419
420 class MyFilter(bt2._UserFilterComponent,
421 message_iterator_class=MyIter):
422 def __init__(comp_self, params):
423 nonlocal port1, port2, port3
424 port1 = comp_self._add_input_port('clear')
425 port2 = comp_self._add_input_port('print')
426 port3 = comp_self._add_input_port('insert')
427
428 comp = self._create_comp(MyFilter)
429 self.assertEqual(port3.addr, comp.input_ports['insert'].addr)
430 self.assertEqual(port2.addr, comp.input_ports['print'].addr)
431 self.assertEqual(port1.addr, comp.input_ports['clear'].addr)
432 del port1
433 del port2
434 del port3
435
436 def test_gen_sink_input_ports_getitem(self):
437 port1 = None
438 port2 = None
439 port3 = None
440
441 class MySink(bt2._UserSinkComponent):
442 def __init__(comp_self, params):
443 nonlocal port1, port2, port3
444 port1 = comp_self._add_input_port('clear')
445 port2 = comp_self._add_input_port('print')
446 port3 = comp_self._add_input_port('insert')
447
448 def _consume(self):
449 pass
450
451 comp = self._create_comp(MySink)
452 self.assertEqual(port3.addr, comp.input_ports['insert'].addr)
453 self.assertEqual(port2.addr, comp.input_ports['print'].addr)
454 self.assertEqual(port1.addr, comp.input_ports['clear'].addr)
455 del port1
456 del port2
457 del port3
458
459 def test_gen_src_output_ports_getitem_invalid_key(self):
460 class MyIter(bt2._UserMessageIterator):
461 def __next__(self):
462 raise bt2.Stop
463
464 class MySource(bt2._UserSourceComponent,
465 message_iterator_class=MyIter):
466 def __init__(comp_self, params):
467 comp_self._add_output_port('clear')
468 comp_self._add_output_port('print')
469 comp_self._add_output_port('insert')
470
471 comp = self._create_comp(MySource)
472
473 with self.assertRaises(KeyError):
474 comp.output_ports['hello']
475
476 def test_gen_flt_output_ports_getitem_invalid_key(self):
477 class MyIter(bt2._UserMessageIterator):
478 def __next__(self):
479 raise bt2.Stop
480
481 class MyFilter(bt2._UserFilterComponent,
482 message_iterator_class=MyIter):
483 def __init__(comp_self, params):
484 comp_self._add_output_port('clear')
485 comp_self._add_output_port('print')
486 comp_self._add_output_port('insert')
487
488 comp = self._create_comp(MyFilter)
489
490 with self.assertRaises(KeyError):
491 comp.output_ports['hello']
492
493 def test_gen_flt_input_ports_getitem_invalid_key(self):
494 class MyIter(bt2._UserMessageIterator):
495 def __next__(self):
496 raise bt2.Stop
497
498 class MyFilter(bt2._UserFilterComponent,
499 message_iterator_class=MyIter):
500 def __init__(comp_self, params):
501 comp_self._add_input_port('clear')
502 comp_self._add_input_port('print')
503 comp_self._add_input_port('insert')
504
505 comp = self._create_comp(MyFilter)
506
507 with self.assertRaises(KeyError):
508 comp.input_ports['hello']
509
510 def test_gen_sink_input_ports_getitem_invalid_key(self):
511 class MySink(bt2._UserSinkComponent):
512 def __init__(comp_self, params):
513 comp_self._add_input_port('clear')
514 comp_self._add_input_port('print')
515 comp_self._add_input_port('insert')
516
517 with self.assertRaises(KeyError):
518 comp_self._input_ports['hello']
519
520 def _consume(self):
521 pass
522
523 comp = self._create_comp(MySink)
524
525 with self.assertRaises(KeyError):
526 comp.input_ports['hello']
527
528 def test_gen_src_output_ports_len(self):
529 class MyIter(bt2._UserMessageIterator):
530 def __next__(self):
531 raise bt2.Stop
532
533 class MySource(bt2._UserSourceComponent,
534 message_iterator_class=MyIter):
535 def __init__(comp_self, params):
536 comp_self._add_output_port('clear')
537 comp_self._add_output_port('print')
538 comp_self._add_output_port('insert')
539
540 comp = self._create_comp(MySource)
541 self.assertEqual(len(comp.output_ports), 3)
542
543 def test_gen_flt_output_ports_len(self):
544 class MyIter(bt2._UserMessageIterator):
545 def __next__(self):
546 raise bt2.Stop
547
548 class MyFilter(bt2._UserFilterComponent,
549 message_iterator_class=MyIter):
550 def __init__(comp_self, params):
551 comp_self._add_output_port('clear')
552 comp_self._add_output_port('print')
553 comp_self._add_output_port('insert')
554
555 comp = self._create_comp(MyFilter)
556 self.assertEqual(len(comp.output_ports), 3)
557
558 def test_gen_flt_input_ports_len(self):
559 class MyIter(bt2._UserMessageIterator):
560 def __next__(self):
561 raise bt2.Stop
562
563 class MyFilter(bt2._UserFilterComponent,
564 message_iterator_class=MyIter):
565 def __init__(comp_self, params):
566 comp_self._add_input_port('clear')
567 comp_self._add_input_port('print')
568 comp_self._add_input_port('insert')
569
570 comp = self._create_comp(MyFilter)
571 self.assertEqual(len(comp.input_ports), 3)
572
573 def test_gen_sink_input_ports_len(self):
574 class MySink(bt2._UserSinkComponent):
575 def __init__(comp_self, params):
576 comp_self._add_input_port('clear')
577 comp_self._add_input_port('print')
578 comp_self._add_input_port('insert')
579
580 def _consume(self):
581 pass
582
583 comp = self._create_comp(MySink)
584 self.assertEqual(len(comp.input_ports), 3)
585
586 def test_gen_src_output_ports_iter(self):
587 class MyIter(bt2._UserMessageIterator):
588 def __next__(self):
589 raise bt2.Stop
590
591 port1 = None
592 port2 = None
593 port3 = None
594
595 class MySource(bt2._UserSourceComponent,
596 message_iterator_class=MyIter):
597 def __init__(comp_self, params):
598 nonlocal port1, port2, port3
599 port1 = comp_self._add_output_port('clear')
600 port2 = comp_self._add_output_port('print')
601 port3 = comp_self._add_output_port('insert')
602
603 comp = self._create_comp(MySource)
604 ports = []
605
606 for port_name, port in comp.output_ports.items():
607 ports.append((port_name, port))
608
609 self.assertEqual(ports[0][0], 'clear')
610 self.assertEqual(ports[0][1].addr, port1.addr)
611 self.assertEqual(ports[1][0], 'print')
612 self.assertEqual(ports[1][1].addr, port2.addr)
613 self.assertEqual(ports[2][0], 'insert')
614 self.assertEqual(ports[2][1].addr, port3.addr)
615 del port1
616 del port2
617 del port3
618
619 def test_gen_flt_output_ports_iter(self):
620 class MyIter(bt2._UserMessageIterator):
621 def __next__(self):
622 raise bt2.Stop
623
624 port1 = None
625 port2 = None
626 port3 = None
627
628 class MyFilter(bt2._UserFilterComponent,
629 message_iterator_class=MyIter):
630 def __init__(comp_self, params):
631 nonlocal port1, port2, port3
632 port1 = comp_self._add_output_port('clear')
633 port2 = comp_self._add_output_port('print')
634 port3 = comp_self._add_output_port('insert')
635
636 comp = self._create_comp(MyFilter)
637 ports = []
638
639 for port_name, port in comp.output_ports.items():
640 ports.append((port_name, port))
641
642 self.assertEqual(ports[0][0], 'clear')
643 self.assertEqual(ports[0][1].addr, port1.addr)
644 self.assertEqual(ports[1][0], 'print')
645 self.assertEqual(ports[1][1].addr, port2.addr)
646 self.assertEqual(ports[2][0], 'insert')
647 self.assertEqual(ports[2][1].addr, port3.addr)
648 del port1
649 del port2
650 del port3
651
652 def test_gen_flt_input_ports_iter(self):
653 class MyIter(bt2._UserMessageIterator):
654 def __next__(self):
655 raise bt2.Stop
656
657 port1 = None
658 port2 = None
659 port3 = None
660
661 class MyFilter(bt2._UserFilterComponent,
662 message_iterator_class=MyIter):
663 def __init__(comp_self, params):
664 nonlocal port1, port2, port3
665 port1 = comp_self._add_input_port('clear')
666 port2 = comp_self._add_input_port('print')
667 port3 = comp_self._add_input_port('insert')
668
669 comp = self._create_comp(MyFilter)
670 ports = []
671
672 for port_name, port in comp.input_ports.items():
673 ports.append((port_name, port))
674
675 self.assertEqual(ports[0][0], 'clear')
676 self.assertEqual(ports[0][1].addr, port1.addr)
677 self.assertEqual(ports[1][0], 'print')
678 self.assertEqual(ports[1][1].addr, port2.addr)
679 self.assertEqual(ports[2][0], 'insert')
680 self.assertEqual(ports[2][1].addr, port3.addr)
681 del port1
682 del port2
683 del port3
684
685 def test_gen_sink_input_ports_iter(self):
686 port1 = None
687 port2 = None
688 port3 = None
689
690 class MySink(bt2._UserSinkComponent):
691 def __init__(comp_self, params):
692 nonlocal port1, port2, port3
693 port1 = comp_self._add_input_port('clear')
694 port2 = comp_self._add_input_port('print')
695 port3 = comp_self._add_input_port('insert')
696
697 def _consume(self):
698 pass
699
700 comp = self._create_comp(MySink)
701 ports = []
702
703 for port_name, port in comp.input_ports.items():
704 ports.append((port_name, port))
705
706 self.assertEqual(ports[0][0], 'clear')
707 self.assertEqual(ports[0][1].addr, port1.addr)
708 self.assertEqual(ports[1][0], 'print')
709 self.assertEqual(ports[1][1].addr, port2.addr)
710 self.assertEqual(ports[2][0], 'insert')
711 self.assertEqual(ports[2][1].addr, port3.addr)
712 del port1
713 del port2
714 del port3
715
716 def test_name(self):
717 class MySink(bt2._UserSinkComponent):
718 def __init__(comp_self, params):
719 comp_self._add_input_port('clear')
720
721 def _consume(self):
722 pass
723
724 comp = self._create_comp(MySink)
725 self.assertEqual(comp.input_ports['clear'].name, 'clear')
726
727 def test_component(self):
728 class MySink(bt2._UserSinkComponent):
729 def __init__(comp_self, params):
730 comp_self._add_input_port('clear')
731
732 def _consume(self):
733 pass
734
735 comp = self._create_comp(MySink)
736 self.assertEqual(comp.input_ports['clear'].component.addr, comp.addr)
737
738 def test_connection_none(self):
739 class MySink(bt2._UserSinkComponent):
740 def __init__(comp_self, params):
741 comp_self._add_input_port('clear')
742
743 def _consume(self):
744 pass
745
746 comp = self._create_comp(MySink)
747 self.assertIsNone(comp.input_ports['clear'].connection)
748
749 def test_is_connected_false(self):
750 class MySink(bt2._UserSinkComponent):
751 def __init__(comp_self, params):
752 comp_self._add_input_port('clear')
753
754 def _consume(self):
755 pass
756
757 comp = self._create_comp(MySink)
758 self.assertFalse(comp.input_ports['clear'].is_connected)
759
760 def test_eq(self):
761 class MySink(bt2._UserSinkComponent):
762 def __init__(comp_self, params):
763 comp_self._add_input_port('clear')
764
765 def _consume(self):
766 pass
767
768 comp = self._create_comp(MySink)
769 self.assertEqual(comp.input_ports['clear'],
770 comp.input_ports['clear'])
771
772 def test_eq_invalid(self):
773 class MySink(bt2._UserSinkComponent):
774 def __init__(comp_self, params):
775 comp_self._add_input_port('clear')
776
777 def _consume(self):
778 pass
779
780 comp = self._create_comp(MySink)
781 self.assertNotEqual(comp.input_ports['clear'], 23)
782
783 def test_disconnect_no_connection(self):
784 class MySink(bt2._UserSinkComponent):
785 def __init__(comp_self, params):
786 port = comp_self._add_input_port('clear')
787
788 def _consume(self):
789 pass
790
791 comp = self._create_comp(MySink)
792 comp.input_ports['clear'].disconnect()
793
794 def test_priv_name(self):
795 class MySink(bt2._UserSinkComponent):
796 def __init__(comp_self, params):
797 port = comp_self._add_input_port('clear')
798 self.assertEqual(port.name, 'clear')
799
800 def _consume(self):
801 pass
802
803 comp = self._create_comp(MySink)
804
805 def test_priv_component(self):
806 class MySink(bt2._UserSinkComponent):
807 def __init__(comp_self, params):
808 port = comp_self._add_input_port('clear')
809 self.assertEqual(port.component, comp_self)
810
811 def _consume(self):
812 pass
813
814 comp = self._create_comp(MySink)
815
816 def test_priv_connection_none(self):
817 class MySink(bt2._UserSinkComponent):
818 def __init__(comp_self, params):
819 port = comp_self._add_input_port('clear')
820 self.assertIsNone(port.connection)
821
822 def _consume(self):
823 pass
824
825 comp = self._create_comp(MySink)
826
827 def test_priv_is_connected_false(self):
828 class MySink(bt2._UserSinkComponent):
829 def __init__(comp_self, params):
830 port = comp_self._add_input_port('clear')
831 self.assertFalse(port.is_connected)
832
833 def _consume(self):
834 pass
835
836 comp = self._create_comp(MySink)
837
838 def test_priv_eq(self):
839 class MySink(bt2._UserSinkComponent):
840 def __init__(comp_self, params):
841 port = comp_self._add_input_port('clear')
842 self.assertEqual(port, port)
843
844 def _consume(self):
845 pass
846
847 comp = self._create_comp(MySink)
848
849 def test_priv_eq_invalid(self):
850 class MySink(bt2._UserSinkComponent):
851 def __init__(comp_self, params):
852 port = comp_self._add_input_port('clear')
853 self.assertNotEqual(port, 23)
854
855 def _consume(self):
856 pass
857
858 comp = self._create_comp(MySink)
859
860 def test_priv_disconnect_no_connection(self):
861 class MySink(bt2._UserSinkComponent):
862 def __init__(comp_self, params):
863 port = comp_self._add_input_port('clear')
864 port.disconnect()
865
866 def _consume(self):
867 pass
868
869 comp = self._create_comp(MySink)
870
871 def test_priv_remove_from_component(self):
872 class MySink(bt2._UserSinkComponent):
873 def __init__(comp_self, params):
874 port = comp_self._add_input_port('clear')
875 self.assertEqual(len(comp_self._input_ports), 1)
876
877 try:
878 port.remove_from_component()
879 except:
880 import traceback
881 traceback.print_exc()
882
883 self.assertEqual(len(comp_self._input_ports), 0)
884 self.assertIsNone(port.component)
885
886 def _consume(self):
887 pass
888
889 comp = self._create_comp(MySink)
This page took 0.051261 seconds and 5 git commands to generate.