1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
25
from bzrlib import errors
26
from bzrlib.branch import Branch
27
from bzrlib.bzrdir import BzrDir
28
from bzrlib.errors import ParamikoNotPresent
29
from bzrlib.tests import TestCaseWithTransport, TestSkipped
30
from bzrlib.transport import get_transport, remote
31
from bzrlib.smart import medium
34
class TestBzrServe(TestCaseWithTransport):
36
def assertInetServerShutsdownCleanly(self, process):
37
"""Shutdown the server process looking for errors."""
38
# Shutdown the server: the server should shut down when it cannot read
41
# Hide stdin from the subprocess module, so it won't fail to close it.
43
result = self.finish_bzr_subprocess(process, retcode=0)
44
self.assertEqual('', result[0])
45
self.assertEqual('', result[1])
47
def assertServerFinishesCleanly(self, process):
48
"""Shutdown the bzr serve instance process looking for errors."""
50
result = self.finish_bzr_subprocess(process, retcode=3,
51
send_signal=signal.SIGINT)
52
self.assertEqual('', result[0])
53
self.assertEqual('bzr: interrupted\n', result[1])
55
def start_server_inet(self, extra_options=()):
56
"""Start a bzr server subprocess using the --inet option.
58
:param extra_options: extra options to give the server.
59
:return: a tuple with the bzr process handle for passing to
60
finish_bzr_subprocess, a client for the server, and a transport.
62
# Serve from the current directory
63
process = self.start_bzr_subprocess(['serve', '--inet'])
65
# Connect to the server
66
# We use this url because while this is no valid URL to connect to this
67
# server instance, the transport needs a URL.
68
client_medium = medium.SmartSimplePipesClientMedium(
69
process.stdout, process.stdin)
70
transport = remote.RemoteTransport(
71
'bzr://localhost/', medium=client_medium)
72
return process, transport
74
def start_server_port(self, extra_options=()):
75
"""Start a bzr server subprocess.
77
:param extra_options: extra options to give the server.
78
:return: a tuple with the bzr process handle for passing to
79
finish_bzr_subprocess, and the base url for the server.
81
# Serve from the current directory
82
args = ['serve', '--port', 'localhost:0']
83
args.extend(extra_options)
84
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
85
port_line = process.stdout.readline()
86
prefix = 'listening on port: '
87
self.assertStartsWith(port_line, prefix)
88
port = int(port_line[len(prefix):])
89
return process,'bzr://localhost:%d/' % port
91
def test_bzr_serve_inet_readonly(self):
92
"""bzr server should provide a read only filesystem by default."""
93
process, transport = self.start_server_inet()
94
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
95
self.assertInetServerShutsdownCleanly(process)
97
def test_bzr_serve_inet_readwrite(self):
101
process, transport = self.start_server_inet(['--allow-writes'])
103
# We get a working branch
104
branch = BzrDir.open_from_transport(transport).open_branch()
105
branch.repository.get_revision_graph()
106
self.assertEqual(None, branch.last_revision())
107
self.assertInetServerShutsdownCleanly(process)
109
def test_bzr_serve_port_readonly(self):
110
"""bzr server should provide a read only filesystem by default."""
111
process, url = self.start_server_port()
112
transport = get_transport(url)
113
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
114
self.assertServerFinishesCleanly(process)
116
def test_bzr_serve_port_readwrite(self):
118
self.make_branch('.')
120
process, url = self.start_server_port(['--allow-writes'])
122
# Connect to the server
123
branch = Branch.open(url)
125
# We get a working branch
126
branch.repository.get_revision_graph()
127
self.assertEqual(None, branch.last_revision())
129
self.assertServerFinishesCleanly(process)
131
def test_bzr_connect_to_bzr_ssh(self):
132
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
134
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
137
from bzrlib.transport.sftp import SFTPServer
138
except ParamikoNotPresent:
139
raise TestSkipped('Paramiko not installed')
140
from bzrlib.tests.stub_sftp import StubServer
143
self.make_branch('a_branch')
145
# Start an SSH server
146
self.command_executed = []
147
# XXX: This is horrible -- we define a really dumb SSH server that
148
# executes commands, and manage the hooking up of stdin/out/err to the
149
# SSH channel ourselves. Surely this has already been implemented
151
class StubSSHServer(StubServer):
155
def check_channel_exec_request(self, channel, command):
156
self.test.command_executed.append(command)
157
proc = subprocess.Popen(
158
command, shell=True, stdin=subprocess.PIPE,
159
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
161
# XXX: horribly inefficient, not to mention ugly.
162
# Start a thread for each of stdin/out/err, and relay bytes from
163
# the subprocess to channel and vice versa.
164
def ferry_bytes(read, write, close):
173
(channel.recv, proc.stdin.write, proc.stdin.close),
174
(proc.stdout.read, channel.sendall, channel.close),
175
(proc.stderr.read, channel.sendall_stderr, channel.close)]
176
for read, write, close in file_functions:
177
t = threading.Thread(
178
target=ferry_bytes, args=(read, write, close))
183
ssh_server = SFTPServer(StubSSHServer)
184
# XXX: We *don't* want to override the default SSH vendor, so we set
185
# _vendor to what _get_ssh_vendor returns.
187
self.addCleanup(ssh_server.tearDown)
188
port = ssh_server._listener.port
190
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
191
# variable is used to tell bzr what command to run on the remote end.
192
path_to_branch = os.path.abspath('a_branch')
194
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
195
os.environ['BZR_REMOTE_PATH'] = self.get_bzr_path()
197
branch = Branch.open(
198
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
200
branch.repository.get_revision_graph()
201
self.assertEqual(None, branch.last_revision())
202
# Check we can perform write operations
203
branch.bzrdir.root_transport.mkdir('foo')
205
# Restore the BZR_REMOTE_PATH environment variable back to its
207
if orig_bzr_remote_path is None:
208
del os.environ['BZR_REMOTE_PATH']
210
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
213
['%s serve --inet --directory=/ --allow-writes'
214
% self.get_bzr_path()],
215
self.command_executed)