111
114
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
112
115
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
117
def test_get_bytes(self):
118
t = self.get_transport()
120
files = ['a', 'b', 'e', 'g']
121
contents = ['contents of a\n',
126
self.build_tree(files, transport=t, line_endings='binary')
127
self.check_transport_contents('contents of a\n', t, 'a')
129
for content, fname in zip(contents, files):
130
self.assertEqual(content, t.get_bytes(fname))
132
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
114
134
def test_put(self):
115
135
t = self.get_transport()
117
137
if t.is_readonly():
118
self.assertRaises(TransportNotPossible,
119
t.put, 'a', 'some text for a\n')
122
t.put('a', StringIO('some text for a\n'))
123
self.failUnless(t.has('a'))
124
self.check_transport_contents('some text for a\n', t, 'a')
125
# Make sure 'has' is updated
126
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
127
[True, False, False, False, False])
128
# Put also replaces contents
129
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
130
('d', StringIO('contents\nfor d\n'))]),
132
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
133
[True, False, False, True, False])
140
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
141
self.check_transport_contents('string\ncontents\n', t, 'a')
143
self.applyDeprecated(zero_eleven,
144
t.put, 'b', StringIO('file-like\ncontents\n'))
145
self.check_transport_contents('file-like\ncontents\n', t, 'b')
147
self.assertRaises(NoSuchFile,
148
t.put, 'path/doesnt/exist/c', StringIO('contents'))
150
def test_put_bytes(self):
151
t = self.get_transport()
154
self.assertRaises(TransportNotPossible,
155
t.put_bytes, 'a', 'some text for a\n')
158
t.put_bytes('a', 'some text for a\n')
159
self.failUnless(t.has('a'))
160
self.check_transport_contents('some text for a\n', t, 'a')
162
# The contents should be overwritten
163
t.put_bytes('a', 'new text for a\n')
164
self.check_transport_contents('new text for a\n', t, 'a')
166
self.assertRaises(NoSuchFile,
167
t.put_bytes, 'path/doesnt/exist/c', 'contents')
169
def test_put_bytes_non_atomic(self):
170
t = self.get_transport()
173
self.assertRaises(TransportNotPossible,
174
t.put_bytes_non_atomic, 'a', 'some text for a\n')
177
self.failIf(t.has('a'))
178
t.put_bytes_non_atomic('a', 'some text for a\n')
179
self.failUnless(t.has('a'))
180
self.check_transport_contents('some text for a\n', t, 'a')
181
# Put also replaces contents
182
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
183
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
185
# Make sure we can create another file
186
t.put_bytes_non_atomic('d', 'contents for\nd\n')
187
# And overwrite 'a' with empty contents
188
t.put_bytes_non_atomic('a', '')
189
self.check_transport_contents('contents for\nd\n', t, 'd')
190
self.check_transport_contents('', t, 'a')
192
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
194
# Now test the create_parent flag
195
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
197
self.failIf(t.has('dir/a'))
198
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
199
create_parent_dir=True)
200
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
202
# But we still get NoSuchFile if we can't make the parent dir
203
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
205
create_parent_dir=True)
207
def test_put_bytes_permissions(self):
208
t = self.get_transport()
212
if not t._can_roundtrip_unix_modebits():
213
# Can't roundtrip, so no need to run this test
215
t.put_bytes('mode644', 'test text\n', mode=0644)
216
self.assertTransportMode(t, 'mode644', 0644)
217
t.put_bytes('mode666', 'test text\n', mode=0666)
218
self.assertTransportMode(t, 'mode666', 0666)
219
t.put_bytes('mode600', 'test text\n', mode=0600)
220
self.assertTransportMode(t, 'mode600', 0600)
221
# Yes, you can put_bytes a file such that it becomes readonly
222
t.put_bytes('mode400', 'test text\n', mode=0400)
223
self.assertTransportMode(t, 'mode400', 0400)
225
# The default permissions should be based on the current umask
226
umask = osutils.get_umask()
227
t.put_bytes('nomode', 'test text\n', mode=None)
228
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
230
def test_put_bytes_non_atomic_permissions(self):
231
t = self.get_transport()
235
if not t._can_roundtrip_unix_modebits():
236
# Can't roundtrip, so no need to run this test
238
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
239
self.assertTransportMode(t, 'mode644', 0644)
240
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
241
self.assertTransportMode(t, 'mode666', 0666)
242
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
243
self.assertTransportMode(t, 'mode600', 0600)
244
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
245
self.assertTransportMode(t, 'mode400', 0400)
247
# The default permissions should be based on the current umask
248
umask = osutils.get_umask()
249
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
250
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
252
# We should also be able to set the mode for a parent directory
254
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
255
dir_mode=0700, create_parent_dir=True)
256
self.assertTransportMode(t, 'dir700', 0700)
257
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
258
dir_mode=0770, create_parent_dir=True)
259
self.assertTransportMode(t, 'dir770', 0770)
260
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
261
dir_mode=0777, create_parent_dir=True)
262
self.assertTransportMode(t, 'dir777', 0777)
264
def test_put_file(self):
265
t = self.get_transport()
268
self.assertRaises(TransportNotPossible,
269
t.put_file, 'a', StringIO('some text for a\n'))
272
t.put_file('a', StringIO('some text for a\n'))
273
self.failUnless(t.has('a'))
274
self.check_transport_contents('some text for a\n', t, 'a')
275
# Put also replaces contents
276
t.put_file('a', StringIO('new\ncontents for\na\n'))
277
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
278
self.assertRaises(NoSuchFile,
279
t.put_file, 'path/doesnt/exist/c',
280
StringIO('contents'))
282
def test_put_file_non_atomic(self):
283
t = self.get_transport()
286
self.assertRaises(TransportNotPossible,
287
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
290
self.failIf(t.has('a'))
291
t.put_file_non_atomic('a', StringIO('some text for a\n'))
292
self.failUnless(t.has('a'))
293
self.check_transport_contents('some text for a\n', t, 'a')
294
# Put also replaces contents
295
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
296
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
298
# Make sure we can create another file
299
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
300
# And overwrite 'a' with empty contents
301
t.put_file_non_atomic('a', StringIO(''))
302
self.check_transport_contents('contents for\nd\n', t, 'd')
303
self.check_transport_contents('', t, 'a')
305
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
306
StringIO('contents\n'))
307
# Now test the create_parent flag
308
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
309
StringIO('contents\n'))
310
self.failIf(t.has('dir/a'))
311
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
312
create_parent_dir=True)
313
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
315
# But we still get NoSuchFile if we can't make the parent dir
316
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
317
StringIO('contents\n'),
318
create_parent_dir=True)
320
def test_put_file_permissions(self):
322
t = self.get_transport()
326
if not t._can_roundtrip_unix_modebits():
327
# Can't roundtrip, so no need to run this test
329
t.put_file('mode644', StringIO('test text\n'), mode=0644)
330
self.assertTransportMode(t, 'mode644', 0644)
331
t.put_file('mode666', StringIO('test text\n'), mode=0666)
332
self.assertTransportMode(t, 'mode666', 0666)
333
t.put_file('mode600', StringIO('test text\n'), mode=0600)
334
self.assertTransportMode(t, 'mode600', 0600)
335
# Yes, you can put a file such that it becomes readonly
336
t.put_file('mode400', StringIO('test text\n'), mode=0400)
337
self.assertTransportMode(t, 'mode400', 0400)
339
# XXX: put_multi is deprecated, so do we really care anymore?
340
self.applyDeprecated(zero_eleven, t.put_multi,
341
[('mmode644', StringIO('text\n'))], mode=0644)
342
self.assertTransportMode(t, 'mmode644', 0644)
344
# The default permissions should be based on the current umask
345
umask = osutils.get_umask()
346
t.put_file('nomode', StringIO('test text\n'), mode=None)
347
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
349
def test_put_file_non_atomic_permissions(self):
350
t = self.get_transport()
354
if not t._can_roundtrip_unix_modebits():
355
# Can't roundtrip, so no need to run this test
357
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
358
self.assertTransportMode(t, 'mode644', 0644)
359
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
360
self.assertTransportMode(t, 'mode666', 0666)
361
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
362
self.assertTransportMode(t, 'mode600', 0600)
363
# Yes, you can put_file_non_atomic a file such that it becomes readonly
364
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
365
self.assertTransportMode(t, 'mode400', 0400)
367
# The default permissions should be based on the current umask
368
umask = osutils.get_umask()
369
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
370
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
372
# We should also be able to set the mode for a parent directory
375
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
376
dir_mode=0700, create_parent_dir=True)
377
self.assertTransportMode(t, 'dir700', 0700)
378
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
379
dir_mode=0770, create_parent_dir=True)
380
self.assertTransportMode(t, 'dir770', 0770)
381
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
382
dir_mode=0777, create_parent_dir=True)
383
self.assertTransportMode(t, 'dir777', 0777)
385
def test_put_multi(self):
386
t = self.get_transport()
390
self.assertEqual(2, self.applyDeprecated(zero_eleven,
391
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
392
('d', StringIO('contents\nfor d\n'))]
394
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
395
[True, False, False, True])
134
396
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
135
397
self.check_transport_contents('contents\nfor d\n', t, 'd')
138
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
139
('d', StringIO('another contents\nfor d\n'))])),
399
self.assertEqual(2, self.applyDeprecated(zero_eleven,
400
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
401
('d', StringIO('another contents\nfor d\n'))])
141
403
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
142
404
self.check_transport_contents('another contents\nfor d\n', t, 'd')
144
self.assertRaises(NoSuchFile,
145
t.put, 'path/doesnt/exist/c', StringIO('contents'))
147
406
def test_put_permissions(self):
148
407
t = self.get_transport()
295
559
t = self.get_transport()
297
561
if t.is_readonly():
298
open('a', 'wb').write('diff\ncontents for\na\n')
299
open('b', 'wb').write('contents\nfor b\n')
302
('a', StringIO('diff\ncontents for\na\n')),
303
('b', StringIO('contents\nfor b\n'))
307
self.assertRaises(TransportNotPossible,
308
t.append, 'a', 'add\nsome\nmore\ncontents\n')
309
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
312
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
314
self.check_transport_contents(
315
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
319
self.assertRaises(TransportNotPossible,
321
[('a', 'and\nthen\nsome\nmore\n'),
322
('b', 'some\nmore\nfor\nb\n')])
323
_append('a', StringIO('and\nthen\nsome\nmore\n'))
324
_append('b', StringIO('some\nmore\nfor\nb\n'))
326
self.assertEqual((43, 15),
327
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
328
('b', StringIO('some\nmore\nfor\nb\n'))]))
563
t.put_bytes('a', 'diff\ncontents for\na\n')
564
t.put_bytes('b', 'contents\nfor b\n')
566
self.assertEqual(20, self.applyDeprecated(zero_eleven,
567
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
569
self.check_transport_contents(
570
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
573
# And we can create new files, too
574
self.assertEqual(0, self.applyDeprecated(zero_eleven,
575
t.append, 'c', StringIO('some text\nfor a missing file\n')))
576
self.check_transport_contents('some text\nfor a missing file\n',
578
def test_append_file(self):
579
t = self.get_transport()
582
self.assertRaises(TransportNotPossible,
583
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
585
t.put_bytes('a', 'diff\ncontents for\na\n')
586
t.put_bytes('b', 'contents\nfor b\n')
589
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
591
self.check_transport_contents(
592
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
595
# a file with no parent should fail..
596
self.assertRaises(NoSuchFile,
597
t.append_file, 'missing/path', StringIO('content'))
599
# And we can create new files, too
601
t.append_file('c', StringIO('some text\nfor a missing file\n')))
602
self.check_transport_contents('some text\nfor a missing file\n',
605
def test_append_bytes(self):
606
t = self.get_transport()
609
self.assertRaises(TransportNotPossible,
610
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
613
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
614
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
617
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
619
self.check_transport_contents(
620
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
623
# a file with no parent should fail..
624
self.assertRaises(NoSuchFile,
625
t.append_bytes, 'missing/path', 'content')
627
def test_append_multi(self):
628
t = self.get_transport()
632
t.put_bytes('a', 'diff\ncontents for\na\n'
633
'add\nsome\nmore\ncontents\n')
634
t.put_bytes('b', 'contents\nfor b\n')
636
self.assertEqual((43, 15),
637
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
638
('b', StringIO('some\nmore\nfor\nb\n'))]))
329
640
self.check_transport_contents(
330
641
'diff\ncontents for\na\n'
331
642
'add\nsome\nmore\ncontents\n'
372
673
'a little bit more\n'
373
674
'some text in a\n',
375
self.check_transport_contents('some text\nfor a missing file\n',
377
676
self.check_transport_contents('missing file r\n', t, 'd')
379
# a file with no parent should fail..
380
if not t.is_readonly():
381
self.assertRaises(NoSuchFile,
382
t.append, 'missing/path',
385
def test_append_file(self):
386
t = self.get_transport()
389
('f1', StringIO('this is a string\nand some more stuff\n')),
390
('f2', StringIO('here is some text\nand a bit more\n')),
391
('f3', StringIO('some text for the\nthird file created\n')),
392
('f4', StringIO('this is a string\nand some more stuff\n')),
393
('f5', StringIO('here is some text\nand a bit more\n')),
394
('f6', StringIO('some text for the\nthird file created\n'))
398
for f, val in contents:
399
open(f, 'wb').write(val.read())
401
t.put_multi(contents)
403
a1 = StringIO('appending to\none\n')
411
self.check_transport_contents(
412
'this is a string\nand some more stuff\n'
413
'appending to\none\n',
416
a2 = StringIO('adding more\ntext to two\n')
417
a3 = StringIO('some garbage\nto put in three\n')
423
t.append_multi([('f2', a2), ('f3', a3)])
427
self.check_transport_contents(
428
'here is some text\nand a bit more\n'
429
'adding more\ntext to two\n',
431
self.check_transport_contents(
432
'some text for the\nthird file created\n'
433
'some garbage\nto put in three\n',
436
# Test that an actual file object can be used with put
445
self.check_transport_contents(
446
'this is a string\nand some more stuff\n'
447
'this is a string\nand some more stuff\n'
448
'appending to\none\n',
457
t.append_multi([('f5', a5), ('f6', a6)])
461
self.check_transport_contents(
462
'here is some text\nand a bit more\n'
463
'here is some text\nand a bit more\n'
464
'adding more\ntext to two\n',
466
self.check_transport_contents(
467
'some text for the\nthird file created\n'
468
'some text for the\nthird file created\n'
469
'some garbage\nto put in three\n',
481
t.append_multi([('a', a6), ('d', a7)])
483
self.check_transport_contents(t.get('f2').read(), t, 'c')
484
self.check_transport_contents(t.get('f3').read(), t, 'd')
486
def test_append_mode(self):
678
def test_append_file_mode(self):
679
"""Check that append accepts a mode parameter"""
487
680
# check append accepts a mode
488
681
t = self.get_transport()
489
682
if t.is_readonly():
491
t.append('f', StringIO('f'), mode=None)
683
self.assertRaises(TransportNotPossible,
684
t.append_file, 'f', StringIO('f'), mode=None)
686
t.append_file('f', StringIO('f'), mode=None)
688
def test_append_bytes_mode(self):
689
# check append_bytes accepts a mode
690
t = self.get_transport()
692
self.assertRaises(TransportNotPossible,
693
t.append_bytes, 'f', 'f', mode=None)
695
t.append_bytes('f', 'f', mode=None)
493
697
def test_delete(self):
494
698
# TODO: Test Transport.delete