183
169
def test_get(self):
184
170
t = self.get_transport()
186
files = ['a', 'b', 'e', 'g']
187
contents = ['contents of a\n',
192
self.build_tree(files, transport=t, line_endings='binary')
193
self.check_transport_contents('contents of a\n', t, 'a')
194
content_f = t.get_multi(files)
195
# Use itertools.izip() instead of use zip() or map(), since they fully
196
# evaluate their inputs, the transport requests should be issued and
197
# handled sequentially (we don't want to force transport to buffer).
198
for content, f in itertools.izip(contents, content_f):
199
self.assertEqual(content, f.read())
201
content_f = t.get_multi(iter(files))
202
# Use itertools.izip() for the same reason
203
for content, f in itertools.izip(contents, content_f):
204
self.assertEqual(content, f.read())
173
content = b'contents of a\n'
174
self.build_tree(['a'], transport=t, line_endings='binary')
175
self.check_transport_contents(b'contents of a\n', t, 'a')
177
self.assertEqual(content, f.read())
206
179
def test_get_unknown_file(self):
207
180
t = self.get_transport()
208
181
files = ['a', 'b']
209
contents = ['contents of a\n',
182
contents = [b'contents of a\n',
212
185
self.build_tree(files, transport=t, line_endings='binary')
213
186
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
188
def iterate_and_close(func, *args):
189
for f in func(*args):
190
# We call f.read() here because things like paramiko actually
191
# spawn a thread to prefetch the content, which we want to
192
# consume before we close the handle.
217
196
def test_get_directory_read_gives_ReadError(self):
218
197
"""consistent errors for read() on a file returned by get()."""
238
217
t = self.get_transport()
240
219
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
220
contents = [b'contents of a\n',
246
225
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
226
self.check_transport_contents(b'contents of a\n', t, 'a')
249
228
for content, fname in zip(contents, files):
250
229
self.assertEqual(content, t.get_bytes(fname))
252
231
def test_get_bytes_unknown_file(self):
253
232
t = self.get_transport()
255
233
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
257
235
def test_get_with_open_write_stream_sees_all_content(self):
258
236
t = self.get_transport()
259
237
if t.is_readonly():
261
handle = t.open_write_stream('foo')
264
self.assertEqual('b', t.get('foo').read())
239
with t.open_write_stream('foo') as handle:
241
self.assertEqual(b'b', t.get_bytes('foo'))
268
243
def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
244
t = self.get_transport()
270
245
if t.is_readonly():
272
handle = t.open_write_stream('foo')
275
self.assertEqual('b', t.get_bytes('foo'))
276
self.assertEqual('b', t.get('foo').read())
247
with t.open_write_stream('foo') as handle:
249
self.assertEqual(b'b', t.get_bytes('foo'))
250
with t.get('foo') as f:
251
self.assertEqual(b'b', f.read())
280
253
def test_put_bytes(self):
281
254
t = self.get_transport()
283
256
if t.is_readonly():
284
257
self.assertRaises(TransportNotPossible,
285
t.put_bytes, 'a', 'some text for a\n')
258
t.put_bytes, 'a', b'some text for a\n')
288
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
290
self.check_transport_contents('some text for a\n', t, 'a')
261
t.put_bytes('a', b'some text for a\n')
262
self.assertTrue(t.has('a'))
263
self.check_transport_contents(b'some text for a\n', t, 'a')
292
265
# The contents should be overwritten
293
t.put_bytes('a', 'new text for a\n')
294
self.check_transport_contents('new text for a\n', t, 'a')
266
t.put_bytes('a', b'new text for a\n')
267
self.check_transport_contents(b'new text for a\n', t, 'a')
296
269
self.assertRaises(NoSuchFile,
297
t.put_bytes, 'path/doesnt/exist/c', 'contents')
270
t.put_bytes, 'path/doesnt/exist/c', b'contents')
299
272
def test_put_bytes_non_atomic(self):
300
273
t = self.get_transport()
302
275
if t.is_readonly():
303
276
self.assertRaises(TransportNotPossible,
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
277
t.put_bytes_non_atomic, 'a', b'some text for a\n')
307
self.failIf(t.has('a'))
308
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
310
self.check_transport_contents('some text for a\n', t, 'a')
280
self.assertFalse(t.has('a'))
281
t.put_bytes_non_atomic('a', b'some text for a\n')
282
self.assertTrue(t.has('a'))
283
self.check_transport_contents(b'some text for a\n', t, 'a')
311
284
# Put also replaces contents
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
285
t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
286
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
315
288
# Make sure we can create another file
316
t.put_bytes_non_atomic('d', 'contents for\nd\n')
289
t.put_bytes_non_atomic('d', b'contents for\nd\n')
317
290
# And overwrite 'a' with empty contents
318
t.put_bytes_non_atomic('a', '')
319
self.check_transport_contents('contents for\nd\n', t, 'd')
320
self.check_transport_contents('', t, 'a')
291
t.put_bytes_non_atomic('a', b'')
292
self.check_transport_contents(b'contents for\nd\n', t, 'd')
293
self.check_transport_contents(b'', t, 'a')
322
295
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
324
297
# Now test the create_parent flag
325
298
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
300
self.assertFalse(t.has('dir/a'))
301
t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
329
302
create_parent_dir=True)
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
303
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
332
305
# But we still get NoSuchFile if we can't make the parent dir
333
306
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
335
create_parent_dir=True)
308
create_parent_dir=True)
337
310
def test_put_bytes_permissions(self):
338
311
t = self.get_transport()
342
315
if not t._can_roundtrip_unix_modebits():
343
316
# Can't roundtrip, so no need to run this test
345
t.put_bytes('mode644', 'test text\n', mode=0644)
346
self.assertTransportMode(t, 'mode644', 0644)
347
t.put_bytes('mode666', 'test text\n', mode=0666)
348
self.assertTransportMode(t, 'mode666', 0666)
349
t.put_bytes('mode600', 'test text\n', mode=0600)
350
self.assertTransportMode(t, 'mode600', 0600)
318
t.put_bytes('mode644', b'test text\n', mode=0o644)
319
self.assertTransportMode(t, 'mode644', 0o644)
320
t.put_bytes('mode666', b'test text\n', mode=0o666)
321
self.assertTransportMode(t, 'mode666', 0o666)
322
t.put_bytes('mode600', b'test text\n', mode=0o600)
323
self.assertTransportMode(t, 'mode600', 0o600)
351
324
# Yes, you can put_bytes a file such that it becomes readonly
352
t.put_bytes('mode400', 'test text\n', mode=0400)
353
self.assertTransportMode(t, 'mode400', 0400)
325
t.put_bytes('mode400', b'test text\n', mode=0o400)
326
self.assertTransportMode(t, 'mode400', 0o400)
355
328
# The default permissions should be based on the current umask
356
329
umask = osutils.get_umask()
357
t.put_bytes('nomode', 'test text\n', mode=None)
358
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
330
t.put_bytes('nomode', b'test text\n', mode=None)
331
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
360
333
def test_put_bytes_non_atomic_permissions(self):
361
334
t = self.get_transport()
365
338
if not t._can_roundtrip_unix_modebits():
366
339
# Can't roundtrip, so no need to run this test
368
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
self.assertTransportMode(t, 'mode644', 0644)
370
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
self.assertTransportMode(t, 'mode666', 0666)
372
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
self.assertTransportMode(t, 'mode600', 0600)
374
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
self.assertTransportMode(t, 'mode400', 0400)
341
t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
342
self.assertTransportMode(t, 'mode644', 0o644)
343
t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
344
self.assertTransportMode(t, 'mode666', 0o666)
345
t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
346
self.assertTransportMode(t, 'mode600', 0o600)
347
t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
348
self.assertTransportMode(t, 'mode400', 0o400)
377
350
# The default permissions should be based on the current umask
378
351
umask = osutils.get_umask()
379
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
352
t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
353
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
382
355
# We should also be able to set the mode for a parent directory
383
356
# when it is created
384
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
dir_mode=0700, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir700', 0700)
387
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
dir_mode=0770, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir770', 0770)
390
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
dir_mode=0777, create_parent_dir=True)
392
self.assertTransportMode(t, 'dir777', 0777)
357
t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
358
dir_mode=0o700, create_parent_dir=True)
359
self.assertTransportMode(t, 'dir700', 0o700)
360
t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
361
dir_mode=0o770, create_parent_dir=True)
362
self.assertTransportMode(t, 'dir770', 0o770)
363
t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
364
dir_mode=0o777, create_parent_dir=True)
365
self.assertTransportMode(t, 'dir777', 0o777)
394
367
def test_put_file(self):
395
368
t = self.get_transport()
397
370
if t.is_readonly():
398
371
self.assertRaises(TransportNotPossible,
399
t.put_file, 'a', StringIO('some text for a\n'))
372
t.put_file, 'a', BytesIO(b'some text for a\n'))
402
result = t.put_file('a', StringIO('some text for a\n'))
375
result = t.put_file('a', BytesIO(b'some text for a\n'))
403
376
# put_file returns the length of the data written
404
377
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
406
self.check_transport_contents('some text for a\n', t, 'a')
378
self.assertTrue(t.has('a'))
379
self.check_transport_contents(b'some text for a\n', t, 'a')
407
380
# Put also replaces contents
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
381
result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
382
self.assertEqual(19, result)
410
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
383
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
411
384
self.assertRaises(NoSuchFile,
412
385
t.put_file, 'path/doesnt/exist/c',
413
StringIO('contents'))
386
BytesIO(b'contents'))
415
388
def test_put_file_non_atomic(self):
416
389
t = self.get_transport()
418
391
if t.is_readonly():
419
392
self.assertRaises(TransportNotPossible,
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
393
t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
423
self.failIf(t.has('a'))
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
426
self.check_transport_contents('some text for a\n', t, 'a')
396
self.assertFalse(t.has('a'))
397
t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
398
self.assertTrue(t.has('a'))
399
self.check_transport_contents(b'some text for a\n', t, 'a')
427
400
# Put also replaces contents
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
401
t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
402
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
431
404
# Make sure we can create another file
432
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
405
t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
406
# And overwrite 'a' with empty contents
434
t.put_file_non_atomic('a', StringIO(''))
435
self.check_transport_contents('contents for\nd\n', t, 'd')
436
self.check_transport_contents('', t, 'a')
407
t.put_file_non_atomic('a', BytesIO(b''))
408
self.check_transport_contents(b'contents for\nd\n', t, 'd')
409
self.check_transport_contents(b'', t, 'a')
438
411
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
StringIO('contents\n'))
412
BytesIO(b'contents\n'))
440
413
# Now test the create_parent flag
441
414
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
415
BytesIO(b'contents\n'))
416
self.assertFalse(t.has('dir/a'))
417
t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
418
create_parent_dir=True)
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
419
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
448
421
# But we still get NoSuchFile if we can't make the parent dir
449
422
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
StringIO('contents\n'),
451
create_parent_dir=True)
423
BytesIO(b'contents\n'),
424
create_parent_dir=True)
453
426
def test_put_file_permissions(self):
459
432
if not t._can_roundtrip_unix_modebits():
460
433
# Can't roundtrip, so no need to run this test
462
t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
self.assertTransportMode(t, 'mode644', 0644)
464
t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
self.assertTransportMode(t, 'mode666', 0666)
466
t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
self.assertTransportMode(t, 'mode600', 0600)
435
t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
436
self.assertTransportMode(t, 'mode644', 0o644)
437
t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
438
self.assertTransportMode(t, 'mode666', 0o666)
439
t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
440
self.assertTransportMode(t, 'mode600', 0o600)
468
441
# Yes, you can put a file such that it becomes readonly
469
t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
self.assertTransportMode(t, 'mode400', 0400)
442
t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
443
self.assertTransportMode(t, 'mode400', 0o400)
471
444
# The default permissions should be based on the current umask
472
445
umask = osutils.get_umask()
473
t.put_file('nomode', StringIO('test text\n'), mode=None)
474
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
446
t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
447
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
476
449
def test_put_file_non_atomic_permissions(self):
477
450
t = self.get_transport()
481
454
if not t._can_roundtrip_unix_modebits():
482
455
# Can't roundtrip, so no need to run this test
484
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
self.assertTransportMode(t, 'mode644', 0644)
486
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
self.assertTransportMode(t, 'mode666', 0666)
488
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
self.assertTransportMode(t, 'mode600', 0600)
457
t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
458
self.assertTransportMode(t, 'mode644', 0o644)
459
t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
460
self.assertTransportMode(t, 'mode666', 0o666)
461
t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
462
self.assertTransportMode(t, 'mode600', 0o600)
490
463
# Yes, you can put_file_non_atomic a file such that it becomes readonly
491
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
self.assertTransportMode(t, 'mode400', 0400)
464
t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
465
self.assertTransportMode(t, 'mode400', 0o400)
494
467
# The default permissions should be based on the current umask
495
468
umask = osutils.get_umask()
496
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
469
t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
470
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
499
472
# We should also be able to set the mode for a parent directory
500
473
# when it is created
502
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
dir_mode=0700, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir700', 0700)
505
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
dir_mode=0770, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir770', 0770)
508
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
dir_mode=0777, create_parent_dir=True)
510
self.assertTransportMode(t, 'dir777', 0777)
475
t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
476
dir_mode=0o700, create_parent_dir=True)
477
self.assertTransportMode(t, 'dir700', 0o700)
478
t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
479
dir_mode=0o770, create_parent_dir=True)
480
self.assertTransportMode(t, 'dir770', 0o770)
481
t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
482
dir_mode=0o777, create_parent_dir=True)
483
self.assertTransportMode(t, 'dir777', 0o777)
512
485
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
486
t = self.get_transport()
520
487
if t.is_readonly():
522
489
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
490
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
540
492
def test_mkdir(self):
541
493
t = self.get_transport()
591
538
# no sense testing on this transport
593
540
# Test mkdir with a mode
594
t.mkdir('dmode755', mode=0755)
595
self.assertTransportMode(t, 'dmode755', 0755)
596
t.mkdir('dmode555', mode=0555)
597
self.assertTransportMode(t, 'dmode555', 0555)
598
t.mkdir('dmode777', mode=0777)
599
self.assertTransportMode(t, 'dmode777', 0777)
600
t.mkdir('dmode700', mode=0700)
601
self.assertTransportMode(t, 'dmode700', 0700)
602
t.mkdir_multi(['mdmode755'], mode=0755)
603
self.assertTransportMode(t, 'mdmode755', 0755)
541
t.mkdir('dmode755', mode=0o755)
542
self.assertTransportMode(t, 'dmode755', 0o755)
543
t.mkdir('dmode555', mode=0o555)
544
self.assertTransportMode(t, 'dmode555', 0o555)
545
t.mkdir('dmode777', mode=0o777)
546
self.assertTransportMode(t, 'dmode777', 0o777)
547
t.mkdir('dmode700', mode=0o700)
548
self.assertTransportMode(t, 'dmode700', 0o700)
549
t.mkdir('mdmode755', mode=0o755)
550
self.assertTransportMode(t, 'mdmode755', 0o755)
605
552
# Default mode should be based on umask
606
553
umask = osutils.get_umask()
607
554
t.mkdir('dnomode', mode=None)
608
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
555
self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
610
557
def test_opening_a_file_stream_creates_file(self):
611
558
t = self.get_transport()
614
561
handle = t.open_write_stream('foo')
616
self.assertEqual('', t.get_bytes('foo'))
563
self.assertEqual(b'', t.get_bytes('foo'))
620
567
def test_opening_a_file_stream_can_set_mode(self):
621
568
t = self.get_transport()
622
569
if t.is_readonly():
570
self.assertRaises((TransportNotPossible, NotImplementedError),
571
t.open_write_stream, 'foo')
624
573
if not t._can_roundtrip_unix_modebits():
625
574
# Can't roundtrip, so no need to run this test
627
577
def check_mode(name, mode, expected):
628
578
handle = t.open_write_stream(name, mode=mode)
630
580
self.assertTransportMode(t, name, expected)
631
check_mode('mode644', 0644, 0644)
632
check_mode('mode666', 0666, 0666)
633
check_mode('mode600', 0600, 0600)
581
check_mode('mode644', 0o644, 0o644)
582
check_mode('mode666', 0o666, 0o666)
583
check_mode('mode600', 0o600, 0o600)
634
584
# The default permissions should be based on the current umask
635
check_mode('nomode', None, 0666 & ~osutils.get_umask())
585
check_mode('nomode', None, 0o666 & ~osutils.get_umask())
637
587
def test_copy_to(self):
638
588
# FIXME: test: same server to same server (partly done)
700
651
if t.is_readonly():
701
652
self.assertRaises(TransportNotPossible,
702
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
653
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
704
t.put_bytes('a', 'diff\ncontents for\na\n')
705
t.put_bytes('b', 'contents\nfor b\n')
655
t.put_bytes('a', b'diff\ncontents for\na\n')
656
t.put_bytes('b', b'contents\nfor b\n')
707
658
self.assertEqual(20,
708
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
659
t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
710
661
self.check_transport_contents(
711
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
662
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
714
665
# a file with no parent should fail..
715
666
self.assertRaises(NoSuchFile,
716
t.append_file, 'missing/path', StringIO('content'))
667
t.append_file, 'missing/path', BytesIO(b'content'))
718
669
# And we can create new files, too
719
670
self.assertEqual(0,
720
t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
self.check_transport_contents('some text\nfor a missing file\n',
671
t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
672
self.check_transport_contents(b'some text\nfor a missing file\n',
724
675
def test_append_bytes(self):
727
678
if t.is_readonly():
728
679
self.assertRaises(TransportNotPossible,
729
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
680
t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
732
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
683
self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
684
self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
735
686
self.assertEqual(20,
736
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
687
t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
738
689
self.check_transport_contents(
739
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
690
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
742
693
# a file with no parent should fail..
743
694
self.assertRaises(NoSuchFile,
744
t.append_bytes, 'missing/path', 'content')
746
def test_append_multi(self):
747
t = self.get_transport()
751
t.put_bytes('a', 'diff\ncontents for\na\n'
752
'add\nsome\nmore\ncontents\n')
753
t.put_bytes('b', 'contents\nfor b\n')
755
self.assertEqual((43, 15),
756
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
('b', StringIO('some\nmore\nfor\nb\n'))]))
759
self.check_transport_contents(
760
'diff\ncontents for\na\n'
761
'add\nsome\nmore\ncontents\n'
762
'and\nthen\nsome\nmore\n',
764
self.check_transport_contents(
766
'some\nmore\nfor\nb\n',
769
self.assertEqual((62, 31),
770
t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
('b', StringIO('from an iterator\n'))])))
772
self.check_transport_contents(
773
'diff\ncontents for\na\n'
774
'add\nsome\nmore\ncontents\n'
775
'and\nthen\nsome\nmore\n'
776
'a little bit more\n',
778
self.check_transport_contents(
780
'some\nmore\nfor\nb\n'
781
'from an iterator\n',
784
self.assertEqual((80, 0),
785
t.append_multi([('a', StringIO('some text in a\n')),
786
('d', StringIO('missing file r\n'))]))
788
self.check_transport_contents(
789
'diff\ncontents for\na\n'
790
'add\nsome\nmore\ncontents\n'
791
'and\nthen\nsome\nmore\n'
792
'a little bit more\n'
795
self.check_transport_contents('missing file r\n', t, 'd')
695
t.append_bytes, 'missing/path', b'content')
797
697
def test_append_file_mode(self):
798
698
"""Check that append accepts a mode parameter"""
822
722
self.assertRaises(TransportNotPossible, t.delete, 'missing')
825
t.put_bytes('a', 'a little bit of text\n')
826
self.failUnless(t.has('a'))
725
t.put_bytes('a', b'a little bit of text\n')
726
self.assertTrue(t.has('a'))
828
self.failIf(t.has('a'))
728
self.assertFalse(t.has('a'))
830
730
self.assertRaises(NoSuchFile, t.delete, 'a')
832
t.put_bytes('a', 'a text\n')
833
t.put_bytes('b', 'b text\n')
834
t.put_bytes('c', 'c text\n')
732
t.put_bytes('a', b'a text\n')
733
t.put_bytes('b', b'b text\n')
734
t.put_bytes('c', b'c text\n')
835
735
self.assertEqual([True, True, True],
836
list(t.has_multi(['a', 'b', 'c'])))
837
t.delete_multi(['a', 'c'])
736
[t.has(n) for n in ['a', 'b', 'c']])
838
739
self.assertEqual([False, True, False],
839
list(t.has_multi(['a', 'b', 'c'])))
840
self.failIf(t.has('a'))
841
self.failUnless(t.has('b'))
842
self.failIf(t.has('c'))
844
self.assertRaises(NoSuchFile,
845
t.delete_multi, ['a', 'b', 'c'])
847
self.assertRaises(NoSuchFile,
848
t.delete_multi, iter(['a', 'b', 'c']))
850
t.put_bytes('a', 'another a text\n')
851
t.put_bytes('c', 'another c text\n')
852
t.delete_multi(iter(['a', 'b', 'c']))
740
[t.has(n) for n in ['a', 'b', 'c']])
741
self.assertFalse(t.has('a'))
742
self.assertTrue(t.has('b'))
743
self.assertFalse(t.has('c'))
745
for name in ['a', 'c', 'd']:
746
self.assertRaises(NoSuchFile, t.delete, name)
854
748
# We should have deleted everything
855
749
# SftpServer creates control files in the
991
889
# creates control files in the working directory
992
890
# perhaps all of this could be done in a subdirectory
994
t.put_bytes('a', 'a first file\n')
995
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
892
t.put_bytes('a', b'a first file\n')
893
self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
998
self.failUnless(t.has('b'))
999
self.failIf(t.has('a'))
896
self.assertTrue(t.has('b'))
897
self.assertFalse(t.has('a'))
1001
self.check_transport_contents('a first file\n', t, 'b')
1002
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
899
self.check_transport_contents(b'a first file\n', t, 'b')
900
self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1004
902
# Overwrite a file
1005
t.put_bytes('c', 'c this file\n')
903
t.put_bytes('c', b'c this file\n')
1006
904
t.move('c', 'b')
1007
self.failIf(t.has('c'))
1008
self.check_transport_contents('c this file\n', t, 'b')
905
self.assertFalse(t.has('c'))
906
self.check_transport_contents(b'c this file\n', t, 'b')
1010
908
# TODO: Try to write a test for atomicity
1011
909
# TODO: Test moving into a non-existent subdirectory
1012
# TODO: Test Transport.move_multi
1014
911
def test_copy(self):
1015
912
t = self.get_transport()
1294
1200
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.failUnless(t1.has('a'))
1297
self.failUnless(t1.has('b/c'))
1298
self.failIf(t1.has('c'))
1202
self.assertTrue(t1.has('a'))
1203
self.assertTrue(t1.has('b/c'))
1204
self.assertFalse(t1.has('c'))
1300
1206
t2 = t1.clone('b')
1301
1207
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.failUnless(t2.has('c'))
1304
self.failIf(t2.has('a'))
1209
self.assertTrue(t2.has('c'))
1210
self.assertFalse(t2.has('a'))
1306
1212
t3 = t2.clone('..')
1307
self.failUnless(t3.has('a'))
1308
self.failIf(t3.has('c'))
1213
self.assertTrue(t3.has('a'))
1214
self.assertFalse(t3.has('c'))
1310
self.failIf(t1.has('b/d'))
1311
self.failIf(t2.has('d'))
1312
self.failIf(t3.has('b/d'))
1216
self.assertFalse(t1.has('b/d'))
1217
self.assertFalse(t2.has('d'))
1218
self.assertFalse(t3.has('b/d'))
1314
1220
if t1.is_readonly():
1315
self.build_tree_contents([('b/d', 'newfile\n')])
1221
self.build_tree_contents([('b/d', b'newfile\n')])
1317
t2.put_bytes('d', 'newfile\n')
1223
t2.put_bytes('d', b'newfile\n')
1319
self.failUnless(t1.has('b/d'))
1320
self.failUnless(t2.has('d'))
1321
self.failUnless(t3.has('b/d'))
1225
self.assertTrue(t1.has('b/d'))
1226
self.assertTrue(t2.has('d'))
1227
self.assertTrue(t3.has('b/d'))
1323
1229
def test_clone_to_root(self):
1324
1230
orig_transport = self.get_transport()
1535
1442
# '\xe5' and '\xe4' actually map to the same file
1536
1443
# adding a suffix kicks in the 'preserving but insensitive'
1537
1444
# route, and maintains the right files
1538
files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
u'\xe4.2', # a w/ dots iso-8859-1
1540
u'\u017d', # Z with umlat iso-8859-2
1541
u'\u062c', # Arabic j
1542
u'\u0410', # Russian A
1543
u'\u65e5', # Kanji person
1445
files = [u'\xe5.1', # a w/ circle iso-8859-1
1446
u'\xe4.2', # a w/ dots iso-8859-1
1447
u'\u017d', # Z with umlat iso-8859-2
1448
u'\u062c', # Arabic j
1449
u'\u0410', # Russian A
1450
u'\u65e5', # Kanji person
1546
1453
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1454
if no_unicode_support:
1548
raise tests.KnownFailure("test server cannot handle unicode paths")
1455
self.knownFailure("test server cannot handle unicode paths")
1551
1458
self.build_tree(files, transport=t, line_endings='binary')
1552
1459
except UnicodeError:
1553
raise TestSkipped("cannot handle unicode paths in current encoding")
1461
"cannot handle unicode paths in current encoding")
1555
1463
# A plain unicode string is not a valid url
1556
1464
for fname in files:
1557
self.assertRaises(InvalidURL, t.get, fname)
1465
self.assertRaises(urlutils.InvalidURL, t.get, fname)
1559
1467
for fname in files:
1560
1468
fname_utf8 = fname.encode('utf-8')
1561
contents = 'contents of %s\n' % (fname_utf8,)
1469
contents = b'contents of %s\n' % (fname_utf8,)
1562
1470
self.check_transport_contents(contents, t, urlutils.escape(fname))
1564
1472
def test_connect_twice_is_same_content(self):
1616
1525
def test_readv(self):
1617
1526
transport = self.get_transport()
1618
1527
if transport.is_readonly():
1619
file('a', 'w').write('0123456789')
1528
with open('a', 'w') as f:
1529
f.write('0123456789')
1621
transport.put_bytes('a', '0123456789')
1531
transport.put_bytes('a', b'0123456789')
1623
1533
d = list(transport.readv('a', ((0, 1),)))
1624
self.assertEqual(d[0], (0, '0'))
1534
self.assertEqual(d[0], (0, b'0'))
1626
1536
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
self.assertEqual(d[0], (0, '0'))
1628
self.assertEqual(d[1], (1, '1'))
1629
self.assertEqual(d[2], (3, '34'))
1630
self.assertEqual(d[3], (9, '9'))
1537
self.assertEqual(d[0], (0, b'0'))
1538
self.assertEqual(d[1], (1, b'1'))
1539
self.assertEqual(d[2], (3, b'34'))
1540
self.assertEqual(d[3], (9, b'9'))
1632
1542
def test_readv_out_of_order(self):
1633
1543
transport = self.get_transport()
1634
1544
if transport.is_readonly():
1635
file('a', 'w').write('0123456789')
1545
with open('a', 'w') as f:
1546
f.write('0123456789')
1637
transport.put_bytes('a', '01234567890')
1548
transport.put_bytes('a', b'01234567890')
1639
1550
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
self.assertEqual(d[0], (1, '1'))
1641
self.assertEqual(d[1], (9, '9'))
1642
self.assertEqual(d[2], (0, '0'))
1643
self.assertEqual(d[3], (3, '34'))
1551
self.assertEqual(d[0], (1, b'1'))
1552
self.assertEqual(d[1], (9, b'9'))
1553
self.assertEqual(d[2], (0, b'0'))
1554
self.assertEqual(d[3], (3, b'34'))
1645
1556
def test_readv_with_adjust_for_latency(self):
1646
1557
transport = self.get_transport()
1710
1622
transport = self.get_transport()
1711
1623
# test from observed failure case.
1712
1624
if transport.is_readonly():
1713
file('a', 'w').write('a'*1024*1024)
1625
with open('a', 'w') as f:
1626
f.write('a' * 1024 * 1024)
1715
transport.put_bytes('a', 'a'*1024*1024)
1628
transport.put_bytes('a', b'a' * 1024 * 1024)
1716
1629
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
(465373, 800), (947422, 800)]
1719
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
found_items = [False]*9
1630
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1631
(465373, 800), (947422, 800)]
1632
results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1633
found_items = [False] * 9
1721
1634
for pos, (start, length) in enumerate(broken_vector):
1722
1635
# check the range is covered by the result
1723
1636
for offset, data in results:
1724
1637
if offset <= start and start + length <= offset + len(data):
1725
1638
found_items[pos] = True
1726
self.assertEqual([True]*9, found_items)
1639
self.assertEqual([True] * 9, found_items)
1728
1641
def test_get_with_open_write_stream_sees_all_content(self):
1729
1642
t = self.get_transport()
1730
1643
if t.is_readonly():
1732
handle = t.open_write_stream('foo')
1735
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1645
with t.open_write_stream('foo') as handle:
1646
handle.write(b'bcd')
1647
self.assertEqual([(0, b'b'), (2, b'd')], list(
1648
t.readv('foo', ((0, 1), (2, 1)))))
1739
1650
def test_get_smart_medium(self):
1740
1651
"""All transports must either give a smart medium, or know they can't.
1742
1653
transport = self.get_transport()
1744
1655
client_medium = transport.get_smart_medium()
1745
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1746
1656
except errors.NoSmartMedium:
1747
1657
# as long as we got it we're fine
1660
from ..bzr.smart import medium
1661
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1750
1663
def test_readv_short_read(self):
1751
1664
transport = self.get_transport()
1752
1665
if transport.is_readonly():
1753
file('a', 'w').write('0123456789')
1666
with open('a', 'w') as f:
1667
f.write('0123456789')
1755
transport.put_bytes('a', '01234567890')
1669
transport.put_bytes('a', b'01234567890')
1757
1671
# This is intentionally reading off the end of the file
1758
1672
# since we are sure that it cannot get there
1759
1673
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
1674
# Can be raised by paramiko
1761
1675
AssertionError),
1762
transport.readv, 'a', [(1,1), (8,10)])
1676
transport.readv, 'a', [(1, 1), (8, 10)])
1764
1678
# This is trying to seek past the end of the file, it should
1765
1679
# also raise a special error
1766
1680
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
transport.readv, 'a', [(12,2)])
1681
transport.readv, 'a', [(12, 2)])
1683
def test_no_segment_parameters(self):
1684
"""Segment parameters should be stripped and stored in
1685
transport.segment_parameters."""
1686
transport = self.get_transport("foo")
1687
self.assertEqual({}, transport.get_segment_parameters())
1689
def test_segment_parameters(self):
1690
"""Segment parameters should be stripped and stored in
1691
transport.get_segment_parameters()."""
1692
base_url = self._server.get_url()
1693
parameters = {"key1": "val1", "key2": "val2"}
1694
url = urlutils.join_segment_parameters(base_url, parameters)
1695
transport = _mod_transport.get_transport_from_url(url)
1696
self.assertEqual(parameters, transport.get_segment_parameters())
1698
def test_set_segment_parameters(self):
1699
"""Segment parameters can be set and show up in base."""
1700
transport = self.get_transport("foo")
1701
orig_base = transport.base
1702
transport.set_segment_parameter("arm", "board")
1703
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1704
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1705
transport.set_segment_parameter("arm", None)
1706
transport.set_segment_parameter("nonexistant", None)
1707
self.assertEqual({}, transport.get_segment_parameters())
1708
self.assertEqual(orig_base, transport.base)
1710
def test_stat_symlink(self):
1711
# if a transport points directly to a symlink (and supports symlinks
1712
# at all) you can tell this. helps with bug 32669.
1713
t = self.get_transport()
1715
t.symlink('target', 'link')
1716
except TransportNotPossible:
1717
raise TestSkipped("symlinks not supported")
1718
t2 = t.clone('link')
1720
self.assertTrue(stat.S_ISLNK(st.st_mode))
1722
def test_abspath_url_unquote_unreserved(self):
1723
"""URLs from abspath should have unreserved characters unquoted
1725
Need consistent quoting notably for tildes, see lp:842223 for more.
1727
t = self.get_transport()
1728
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1729
self.assertEqual(t.base + "-.09AZ_az~",
1730
t.abspath(needlessly_escaped_dir))
1732
def test_clone_url_unquote_unreserved(self):
1733
"""Base URL of a cloned branch needs unreserved characters unquoted
1735
Cloned transports should be prefix comparable for things like the
1736
isolation checking of tests, see lp:842223 for more.
1738
t1 = self.get_transport()
1739
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1740
self.build_tree([needlessly_escaped_dir], transport=t1)
1741
t2 = t1.clone(needlessly_escaped_dir)
1742
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1744
def test_hook_post_connection_one(self):
1745
"""Fire post_connect hook after a ConnectedTransport is first used"""
1747
Transport.hooks.install_named_hook("post_connect", log.append, None)
1748
t = self.get_transport()
1749
self.assertEqual([], log)
1750
t.has("non-existant")
1751
if isinstance(t, RemoteTransport):
1752
self.assertEqual([t.get_smart_medium()], log)
1753
elif isinstance(t, ConnectedTransport):
1754
self.assertEqual([t], log)
1756
self.assertEqual([], log)
1758
def test_hook_post_connection_multi(self):
1759
"""Fire post_connect hook once per unshared underlying connection"""
1761
Transport.hooks.install_named_hook("post_connect", log.append, None)
1762
t1 = self.get_transport()
1764
t3 = self.get_transport()
1765
self.assertEqual([], log)
1769
if isinstance(t1, RemoteTransport):
1770
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1771
elif isinstance(t1, ConnectedTransport):
1772
self.assertEqual([t1, t3], log)
1774
self.assertEqual([], log)