/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-17 00:47:52 UTC
  • mfrom: (7182 work)
  • mto: This revision was merged to the branch mainline in revision 7305.
  • Revision ID: jelmer@jelmer.uk-20181117004752-6ywampe5pfywlby4
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
79
79
                pyutils.get_named_object(module))
80
80
            for (klass, server_factory) in permutations:
81
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
 
                    {"transport_class":klass,
83
 
                     "transport_server":server_factory})
 
82
                            {"transport_class": klass,
 
83
                             "transport_server": server_factory})
84
84
                result.append(scenario)
85
85
        except errors.DependencyNotPresent as e:
86
86
            # Continue even if a dependency prevents us
188
188
                    ]
189
189
        self.build_tree(files, transport=t, line_endings='binary')
190
190
        self.assertRaises(NoSuchFile, t.get, 'c')
 
191
 
191
192
        def iterate_and_close(func, *args):
192
193
            for f in func(*args):
193
194
                # We call f.read() here because things like paramiko actually
258
259
 
259
260
        if t.is_readonly():
260
261
            self.assertRaises(TransportNotPossible,
261
 
                    t.put_bytes, 'a', b'some text for a\n')
 
262
                              t.put_bytes, 'a', b'some text for a\n')
262
263
            return
263
264
 
264
265
        t.put_bytes('a', b'some text for a\n')
277
278
 
278
279
        if t.is_readonly():
279
280
            self.assertRaises(TransportNotPossible,
280
 
                    t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
281
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
281
282
            return
282
283
 
283
284
        self.assertFalse(t.has('a'))
296
297
        self.check_transport_contents(b'', t, 'a')
297
298
 
298
299
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
299
 
                                       b'contents\n')
 
300
                          b'contents\n')
300
301
        # Now test the create_parent flag
301
302
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
302
 
                                       b'contents\n')
 
303
                          b'contents\n')
303
304
        self.assertFalse(t.has('dir/a'))
304
305
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
305
306
                               create_parent_dir=True)
307
308
 
308
309
        # But we still get NoSuchFile if we can't make the parent dir
309
310
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
310
 
                                       b'contents\n',
311
 
                                       create_parent_dir=True)
 
311
                          b'contents\n',
 
312
                          create_parent_dir=True)
312
313
 
313
314
    def test_put_bytes_permissions(self):
314
315
        t = self.get_transport()
372
373
 
373
374
        if t.is_readonly():
374
375
            self.assertRaises(TransportNotPossible,
375
 
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
 
376
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
376
377
            return
377
378
 
378
379
        result = t.put_file('a', BytesIO(b'some text for a\n'))
386
387
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
387
388
        self.assertRaises(NoSuchFile,
388
389
                          t.put_file, 'path/doesnt/exist/c',
389
 
                              BytesIO(b'contents'))
 
390
                          BytesIO(b'contents'))
390
391
 
391
392
    def test_put_file_non_atomic(self):
392
393
        t = self.get_transport()
393
394
 
394
395
        if t.is_readonly():
395
396
            self.assertRaises(TransportNotPossible,
396
 
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
397
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
397
398
            return
398
399
 
399
400
        self.assertFalse(t.has('a'))
412
413
        self.check_transport_contents(b'', t, 'a')
413
414
 
414
415
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
415
 
                                       BytesIO(b'contents\n'))
 
416
                          BytesIO(b'contents\n'))
416
417
        # Now test the create_parent flag
417
418
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
418
 
                                       BytesIO(b'contents\n'))
 
419
                          BytesIO(b'contents\n'))
419
420
        self.assertFalse(t.has('dir/a'))
420
421
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
421
422
                              create_parent_dir=True)
423
424
 
424
425
        # But we still get NoSuchFile if we can't make the parent dir
425
426
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
426
 
                                       BytesIO(b'contents\n'),
427
 
                                       create_parent_dir=True)
 
427
                          BytesIO(b'contents\n'),
 
428
                          create_parent_dir=True)
428
429
 
429
430
    def test_put_file_permissions(self):
430
431
 
501
502
            # defined for the transport interface.
502
503
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
503
504
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
504
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
505
            self.assertRaises(TransportNotPossible,
 
506
                              t.mkdir, 'path/doesnt/exist')
505
507
            return
506
508
        # Test mkdir
507
509
        t.mkdir('dir_a')
512
514
        self.assertEqual(t.has('dir_b'), True)
513
515
 
514
516
        self.assertEqual([t.has(n) for n in
515
 
            ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
516
 
            [True, True, False, True])
 
517
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
518
                         [True, True, False, True])
517
519
 
518
520
        # we were testing that a local mkdir followed by a transport
519
521
        # mkdir failed thusly, but given that we * in one process * do not
610
612
            t2 = t.clone('copy_to_simple')
611
613
            simple_copy_files(t, t2)
612
614
 
613
 
 
614
615
        # Test that copying into a missing directory raises
615
616
        # NoSuchFile
616
617
        if t.is_readonly():
653
654
 
654
655
        if t.is_readonly():
655
656
            self.assertRaises(TransportNotPossible,
656
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
657
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
657
658
            return
658
659
        t.put_bytes('a', b'diff\ncontents for\na\n')
659
660
        t.put_bytes('b', b'contents\nfor b\n')
660
661
 
661
662
        self.assertEqual(20,
662
 
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
663
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
663
664
 
664
665
        self.check_transport_contents(
665
666
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
671
672
 
672
673
        # And we can create new files, too
673
674
        self.assertEqual(0,
674
 
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
675
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
675
676
        self.check_transport_contents(b'some text\nfor a missing file\n',
676
677
                                      t, 'c')
677
678
 
680
681
 
681
682
        if t.is_readonly():
682
683
            self.assertRaises(TransportNotPossible,
683
 
                    t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
684
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
684
685
            return
685
686
 
686
687
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
687
688
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
688
689
 
689
690
        self.assertEqual(20,
690
 
            t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
691
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
691
692
 
692
693
        self.check_transport_contents(
693
694
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
703
704
        t = self.get_transport()
704
705
        if t.is_readonly():
705
706
            self.assertRaises(TransportNotPossible,
706
 
                t.append_file, 'f', BytesIO(b'f'), mode=None)
 
707
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
707
708
            return
708
709
        t.append_file('f', BytesIO(b'f'), mode=None)
709
710
 
712
713
        t = self.get_transport()
713
714
        if t.is_readonly():
714
715
            self.assertRaises(TransportNotPossible,
715
 
                t.append_bytes, 'f', b'f', mode=None)
 
716
                              t.append_bytes, 'f', b'f', mode=None)
716
717
            return
717
718
        t.append_bytes('f', b'f', mode=None)
718
719
 
736
737
        t.put_bytes('b', b'b text\n')
737
738
        t.put_bytes('c', b'c text\n')
738
739
        self.assertEqual([True, True, True],
739
 
                [t.has(n) for n in ['a', 'b', 'c']])
 
740
                         [t.has(n) for n in ['a', 'b', 'c']])
740
741
        t.delete('a')
741
742
        t.delete('c')
742
743
        self.assertEqual([False, True, False],
743
 
                [t.has(n) for n in ['a', 'b', 'c']])
 
744
                         [t.has(n) for n in ['a', 'b', 'c']])
744
745
        self.assertFalse(t.has('a'))
745
746
        self.assertTrue(t.has('b'))
746
747
        self.assertFalse(t.has('c'))
1016
1017
 
1017
1018
            st = t.stat(link_name)
1018
1019
            self.assertTrue(S_ISLNK(st.st_mode),
1019
 
                "expected symlink, got mode %o" % st.st_mode)
 
1020
                            "expected symlink, got mode %o" % st.st_mode)
1020
1021
        except TransportNotPossible:
1021
1022
            raise TestSkipped("Transport %s does not support symlinks." %
1022
1023
                              self._server.__class__)
1115
1116
 
1116
1117
            Only the parameters different from None will be changed.
1117
1118
            """
1118
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1119
 
            if user     is None: user     = t._parsed_url.user
1120
 
            if password is None: password = t._parsed_url.password
1121
 
            if user     is None: user     = t._parsed_url.user
1122
 
            if host     is None: host     = t._parsed_url.host
1123
 
            if port     is None: port     = t._parsed_url.port
1124
 
            if path     is None: path     = t._parsed_url.path
 
1119
            if scheme is None:
 
1120
                scheme = t._parsed_url.scheme
 
1121
            if user is None:
 
1122
                user = t._parsed_url.user
 
1123
            if password is None:
 
1124
                password = t._parsed_url.password
 
1125
            if user is None:
 
1126
                user = t._parsed_url.user
 
1127
            if host is None:
 
1128
                host = t._parsed_url.host
 
1129
            if port is None:
 
1130
                port = t._parsed_url.port
 
1131
            if path is None:
 
1132
                path = t._parsed_url.path
1125
1133
            return str(urlutils.URL(scheme, user, password, host, port, path))
1126
1134
 
1127
1135
        if t._parsed_url.scheme == 'ftp':
1146
1154
        #   (they may be typed by the user when prompted for example)
1147
1155
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1148
1156
        # We will not connect, we can use a invalid host
1149
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1157
        self.assertIsNot(t, t._reuse_for(
 
1158
            new_url(host=t._parsed_url.host + 'bar')))
1150
1159
        if t._parsed_url.port == 1234:
1151
1160
            port = 4321
1152
1161
        else:
1162
1171
 
1163
1172
        c = t.clone('subdir')
1164
1173
        # Some transports will create the connection  only when needed
1165
 
        t.has('surely_not') # Force connection
 
1174
        t.has('surely_not')  # Force connection
1166
1175
        self.assertIs(t._get_connection(), c._get_connection())
1167
1176
 
1168
1177
        # Temporary failure, we need to create a new dummy connection
1177
1186
        if not isinstance(t, ConnectedTransport):
1178
1187
            raise TestSkipped("not a connected transport")
1179
1188
 
1180
 
        t.has('surely_not') # Force connection
 
1189
        t.has('surely_not')  # Force connection
1181
1190
        self.assertIsNot(None, t._get_connection())
1182
1191
 
1183
1192
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1229
1238
        new_transport = root_transport.clone("..")
1230
1239
        # as we are walking up directories, the path must be
1231
1240
        # growing less, except at the top
1232
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1233
 
            or new_transport.base == root_transport.base)
 
1241
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1242
                        new_transport.base == root_transport.base)
1234
1243
        while new_transport.base != root_transport.base:
1235
1244
            root_transport = new_transport
1236
1245
            new_transport = root_transport.clone("..")
1237
1246
            # as we are walking up directories, the path must be
1238
1247
            # growing less, except at the top
1239
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1240
 
                or new_transport.base == root_transport.base)
 
1248
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1249
                            new_transport.base == root_transport.base)
1241
1250
 
1242
1251
        # Cloning to "/" should take us to exactly the same location.
1243
1252
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1253
1262
        orig_transport = self.get_transport()
1254
1263
        root_transport = orig_transport.clone('/')
1255
1264
        self.assertEqual(root_transport.base + '.bzr/',
1256
 
            root_transport.clone('.bzr').base)
 
1265
                         root_transport.clone('.bzr').base)
1257
1266
 
1258
1267
    def test_base_url(self):
1259
1268
        t = self.get_transport()
1354
1363
                         'isolated/dir/',
1355
1364
                         'isolated/dir/foo',
1356
1365
                         'isolated/dir/bar',
1357
 
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1366
                         'isolated/dir/b%25z',  # make sure quoting is correct
1358
1367
                         'isolated/bar'],
1359
1368
                        transport=transport)
1360
1369
        paths = set(transport.iter_files_recursive())
1361
1370
        # nb the directories are not converted
1362
1371
        self.assertEqual(paths,
1363
 
                    {'isolated/dir/foo',
1364
 
                         'isolated/dir/bar',
1365
 
                         'isolated/dir/b%2525z',
1366
 
                         'isolated/bar'})
 
1372
                         {'isolated/dir/foo',
 
1373
                          'isolated/dir/bar',
 
1374
                          'isolated/dir/b%2525z',
 
1375
                          'isolated/bar'})
1367
1376
        sub_transport = transport.clone('isolated')
1368
1377
        paths = set(sub_transport.iter_files_recursive())
1369
1378
        self.assertEqual(paths,
1370
 
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1379
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1371
1380
 
1372
1381
    def test_copy_tree(self):
1373
1382
        # TODO: test file contents and permissions are preserved. This test was
1384
1393
                         'from/dir/',
1385
1394
                         'from/dir/foo',
1386
1395
                         'from/dir/bar',
1387
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1396
                         'from/dir/b%25z',  # make sure quoting is correct
1388
1397
                         'from/bar'],
1389
1398
                        transport=transport)
1390
1399
        transport.copy_tree('from', 'to')
1391
1400
        paths = set(transport.iter_files_recursive())
1392
1401
        self.assertEqual(paths,
1393
 
                    {'from/dir/foo',
1394
 
                         'from/dir/bar',
1395
 
                         'from/dir/b%2525z',
1396
 
                         'from/bar',
1397
 
                         'to/dir/foo',
1398
 
                         'to/dir/bar',
1399
 
                         'to/dir/b%2525z',
1400
 
                         'to/bar',})
 
1402
                         {'from/dir/foo',
 
1403
                          'from/dir/bar',
 
1404
                          'from/dir/b%2525z',
 
1405
                          'from/bar',
 
1406
                          'to/dir/foo',
 
1407
                          'to/dir/bar',
 
1408
                          'to/dir/b%2525z',
 
1409
                          'to/bar', })
1401
1410
 
1402
1411
    def test_copy_tree_to_transport(self):
1403
1412
        transport = self.get_transport()
1411
1420
                         'from/dir/',
1412
1421
                         'from/dir/foo',
1413
1422
                         'from/dir/bar',
1414
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1423
                         'from/dir/b%25z',  # make sure quoting is correct
1415
1424
                         'from/bar'],
1416
1425
                        transport=transport)
1417
1426
        from_transport = transport.clone('from')
1420
1429
        from_transport.copy_tree_to_transport(to_transport)
1421
1430
        paths = set(transport.iter_files_recursive())
1422
1431
        self.assertEqual(paths,
1423
 
                    {'from/dir/foo',
1424
 
                         'from/dir/bar',
1425
 
                         'from/dir/b%2525z',
1426
 
                         'from/bar',
1427
 
                         'to/dir/foo',
1428
 
                         'to/dir/bar',
1429
 
                         'to/dir/b%2525z',
1430
 
                         'to/bar',})
 
1432
                         {'from/dir/foo',
 
1433
                          'from/dir/bar',
 
1434
                          'from/dir/b%2525z',
 
1435
                          'from/bar',
 
1436
                          'to/dir/foo',
 
1437
                          'to/dir/bar',
 
1438
                          'to/dir/b%2525z',
 
1439
                          'to/bar', })
1431
1440
 
1432
1441
    def test_unicode_paths(self):
1433
1442
        """Test that we can read/write files with Unicode names."""
1437
1446
        # '\xe5' and '\xe4' actually map to the same file
1438
1447
        # adding a suffix kicks in the 'preserving but insensitive'
1439
1448
        # route, and maintains the right files
1440
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1441
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1442
 
                 u'\u017d', # Z with umlat iso-8859-2
1443
 
                 u'\u062c', # Arabic j
1444
 
                 u'\u0410', # Russian A
1445
 
                 u'\u65e5', # Kanji person
1446
 
                ]
 
1449
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1450
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1451
                 u'\u017d',  # Z with umlat iso-8859-2
 
1452
                 u'\u062c',  # Arabic j
 
1453
                 u'\u0410',  # Russian A
 
1454
                 u'\u65e5',  # Kanji person
 
1455
                 ]
1447
1456
 
1448
1457
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1449
1458
        if no_unicode_support:
1452
1461
        try:
1453
1462
            self.build_tree(files, transport=t, line_endings='binary')
1454
1463
        except UnicodeError:
1455
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1464
            raise TestSkipped(
 
1465
                "cannot handle unicode paths in current encoding")
1456
1466
 
1457
1467
        # A plain unicode string is not a valid url
1458
1468
        for fname in files:
1486
1496
        """
1487
1497
        transport = self.get_transport()
1488
1498
        if transport.is_readonly():
1489
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1499
            self.assertRaises(TransportNotPossible,
 
1500
                              transport.lock_write, 'foo')
1490
1501
            return
1491
1502
        transport.put_bytes('lock', b'')
1492
1503
        try:
1518
1529
    def test_readv(self):
1519
1530
        transport = self.get_transport()
1520
1531
        if transport.is_readonly():
1521
 
            with open('a', 'w') as f: f.write('0123456789')
 
1532
            with open('a', 'w') as f:
 
1533
                f.write('0123456789')
1522
1534
        else:
1523
1535
            transport.put_bytes('a', b'0123456789')
1524
1536
 
1534
1546
    def test_readv_out_of_order(self):
1535
1547
        transport = self.get_transport()
1536
1548
        if transport.is_readonly():
1537
 
            with open('a', 'w') as f: f.write('0123456789')
 
1549
            with open('a', 'w') as f:
 
1550
                f.write('0123456789')
1538
1551
        else:
1539
1552
            transport.put_bytes('a', b'01234567890')
1540
1553
 
1553
1566
        # reference the returned data with the random data. To avoid doing
1554
1567
        # multiple large random byte look ups we do several tests on the same
1555
1568
        # backing data.
1556
 
        content = osutils.rand_bytes(200*1024)
 
1569
        content = osutils.rand_bytes(200 * 1024)
1557
1570
        content_size = len(content)
1558
1571
        if transport.is_readonly():
1559
1572
            self.build_tree_contents([('a', content)])
1560
1573
        else:
1561
1574
            transport.put_bytes('a', content)
 
1575
 
1562
1576
        def check_result_data(result_vector):
1563
1577
            for item in result_vector:
1564
1578
                data_len = len(item[1])
1566
1580
 
1567
1581
        # start corner case
1568
1582
        result = list(transport.readv('a', ((0, 30),),
1569
 
            adjust_for_latency=True, upper_limit=content_size))
 
1583
                                      adjust_for_latency=True, upper_limit=content_size))
1570
1584
        # we expect 1 result, from 0, to something > 30
1571
1585
        self.assertEqual(1, len(result))
1572
1586
        self.assertEqual(0, result[0][0])
1574
1588
        check_result_data(result)
1575
1589
        # end of file corner case
1576
1590
        result = list(transport.readv('a', ((204700, 100),),
1577
 
            adjust_for_latency=True, upper_limit=content_size))
 
1591
                                      adjust_for_latency=True, upper_limit=content_size))
1578
1592
        # we expect 1 result, from 204800- its length, to the end
1579
1593
        self.assertEqual(1, len(result))
1580
1594
        data_len = len(result[0][1])
1581
 
        self.assertEqual(204800-data_len, result[0][0])
 
1595
        self.assertEqual(204800 - data_len, result[0][0])
1582
1596
        self.assertTrue(data_len >= 100)
1583
1597
        check_result_data(result)
1584
1598
        # out of order ranges are made in order
1585
1599
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1586
 
            adjust_for_latency=True, upper_limit=content_size))
 
1600
                                      adjust_for_latency=True, upper_limit=content_size))
1587
1601
        # we expect 2 results, in order, start and end.
1588
1602
        self.assertEqual(2, len(result))
1589
1603
        # start
1592
1606
        self.assertTrue(data_len >= 30)
1593
1607
        # end
1594
1608
        data_len = len(result[1][1])
1595
 
        self.assertEqual(204800-data_len, result[1][0])
 
1609
        self.assertEqual(204800 - data_len, result[1][0])
1596
1610
        self.assertTrue(data_len >= 100)
1597
1611
        check_result_data(result)
1598
1612
        # close ranges get combined (even if out of order)
1599
1613
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1600
1614
            result = list(transport.readv('a', request_vector,
1601
 
                adjust_for_latency=True, upper_limit=content_size))
 
1615
                                          adjust_for_latency=True, upper_limit=content_size))
1602
1616
            self.assertEqual(1, len(result))
1603
1617
            data_len = len(result[0][1])
1604
1618
            # minimum length is from 400 to 1034 - 634
1612
1626
        transport = self.get_transport()
1613
1627
        # test from observed failure case.
1614
1628
        if transport.is_readonly():
1615
 
            with open('a', 'w') as f: f.write('a'*1024*1024)
 
1629
            with open('a', 'w') as f:
 
1630
                f.write('a' * 1024 * 1024)
1616
1631
        else:
1617
 
            transport.put_bytes('a', b'a'*1024*1024)
 
1632
            transport.put_bytes('a', b'a' * 1024 * 1024)
1618
1633
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1619
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1620
 
            (465373, 800), (947422, 800)]
1621
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1622
 
        found_items = [False]*9
 
1634
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1635
                         (465373, 800), (947422, 800)]
 
1636
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1637
        found_items = [False] * 9
1623
1638
        for pos, (start, length) in enumerate(broken_vector):
1624
1639
            # check the range is covered by the result
1625
1640
            for offset, data in results:
1626
1641
                if offset <= start and start + length <= offset + len(data):
1627
1642
                    found_items[pos] = True
1628
 
        self.assertEqual([True]*9, found_items)
 
1643
        self.assertEqual([True] * 9, found_items)
1629
1644
 
1630
1645
    def test_get_with_open_write_stream_sees_all_content(self):
1631
1646
        t = self.get_transport()
1633
1648
            return
1634
1649
        with t.open_write_stream('foo') as handle:
1635
1650
            handle.write(b'bcd')
1636
 
            self.assertEqual([(0, b'b'), (2, b'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
 
1651
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1652
                t.readv('foo', ((0, 1), (2, 1)))))
1637
1653
 
1638
1654
    def test_get_smart_medium(self):
1639
1655
        """All transports must either give a smart medium, or know they can't.
1649
1665
    def test_readv_short_read(self):
1650
1666
        transport = self.get_transport()
1651
1667
        if transport.is_readonly():
1652
 
            with open('a', 'w') as f: f.write('0123456789')
 
1668
            with open('a', 'w') as f:
 
1669
                f.write('0123456789')
1653
1670
        else:
1654
1671
            transport.put_bytes('a', b'01234567890')
1655
1672
 
1706
1723
 
1707
1724
    def test_abspath_url_unquote_unreserved(self):
1708
1725
        """URLs from abspath should have unreserved characters unquoted
1709
 
        
 
1726
 
1710
1727
        Need consistent quoting notably for tildes, see lp:842223 for more.
1711
1728
        """
1712
1729
        t = self.get_transport()
1713
1730
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1714
1731
        self.assertEqual(t.base + "-.09AZ_az~",
1715
 
            t.abspath(needlessly_escaped_dir))
 
1732
                         t.abspath(needlessly_escaped_dir))
1716
1733
 
1717
1734
    def test_clone_url_unquote_unreserved(self):
1718
1735
        """Base URL of a cloned branch needs unreserved characters unquoted
1719
 
        
 
1736
 
1720
1737
        Cloned transports should be prefix comparable for things like the
1721
1738
        isolation checking of tests, see lp:842223 for more.
1722
1739
        """