1838
1838
def setUp(self):
1839
1839
super(TestConcurrency, self).setUp()
1840
self.saved_concurrency_override = osutils._local_concurrency_override
1841
self.addCleanup(self.restore_concurrency_override)
1843
def restore_concurrency_override(self):
1844
osutils._local_concurrency_override = self.saved_concurrency_override
1840
orig = osutils._cached_local_concurrency
1842
osutils._cached_local_concurrency = orig
1843
self.addCleanup(restore)
1846
1845
def test_local_concurrency(self):
1847
1846
concurrency = osutils.local_concurrency()
1848
1847
self.assertIsInstance(concurrency, int)
1850
def test_local_concurrency_option(self):
1851
osutils._local_concurrency_override = 2
1852
self.assertEqual(2, osutils.local_concurrency(False))
1853
osutils._local_concurrency_override = 3
1854
self.assertEqual(3, osutils.local_concurrency(False))
1855
#os.environ['BZR_CONCURRENCY'] = 'foo'
1856
#self.assertEqual(1, osutils.local_concurrency(False))
1849
def test_local_concurrency_environment_variable(self):
1850
os.environ['BZR_CONCURRENCY'] = '2'
1851
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1852
os.environ['BZR_CONCURRENCY'] = '3'
1853
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1854
os.environ['BZR_CONCURRENCY'] = 'foo'
1855
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1857
def test_option_concurrency(self):
1858
os.environ['BZR_CONCURRENCY'] = '1'
1859
self.run_bzr('rocks --concurrency 42')
1860
# Command line overrides envrionment variable
1861
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1862
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1859
1865
class TestFailedToLoadExtension(tests.TestCase):