42
43
class TestBzrServe(TestCaseWithTransport):
44
def test_bzr_serve_inet(self):
48
# Serve that branch from the current directory
49
process = self.start_bzr_subprocess(['serve', '--inet'])
51
# Connect to the server
52
# We use this url because while this is no valid URL to connect to this
53
# server instance, the transport needs a URL.
54
client = DoesNotCloseStdOutClient(
55
lambda: (process.stdout, process.stdin))
56
transport = smart.SmartTransport('bzr://localhost/', client=client)
58
# We get a working branch
59
branch = BzrDir.open_from_transport(transport).open_branch()
60
branch.repository.get_revision_graph()
61
self.assertEqual(None, branch.last_revision())
63
# finish with the transport
45
def assertInetServerShutsdownCleanly(self, client, process):
46
"""Shutdown the server process looking for errors."""
65
47
# Disconnect the client forcefully JUST IN CASE because of __del__'s use
66
48
# in the smart module.
67
49
client.disconnect()
75
57
self.assertEqual('', result[0])
76
58
self.assertEqual('', result[1])
78
def test_bzr_serve_port(self):
82
# Serve that branch from the current directory
83
process = self.start_bzr_subprocess(['serve', '--port', 'localhost:0'],
84
skip_if_plan_to_signal=True)
60
def assertServerFinishesCleanly(self, process):
61
"""Shutdown the bzr serve instance process looking for errors."""
63
result = self.finish_bzr_subprocess(process, retcode=3,
64
send_signal=signal.SIGINT)
65
self.assertEqual('', result[0])
66
self.assertEqual('bzr: interrupted\n', result[1])
68
def start_server_inet(self, extra_options=()):
69
"""Start a bzr server subprocess using the --inet option.
71
:param extra_options: extra options to give the server.
72
:return: a tuple with the bzr process handle for passing to
73
finish_bzr_subprocess, a client for the server, and a transport.
75
# Serve from the current directory
76
process = self.start_bzr_subprocess(['serve', '--inet'])
78
# Connect to the server
79
# We use this url because while this is no valid URL to connect to this
80
# server instance, the transport needs a URL.
81
client = DoesNotCloseStdOutClient(
82
lambda: (process.stdout, process.stdin))
83
transport = smart.SmartTransport('bzr://localhost/', client=client)
84
return process, client, transport
86
def start_server_port(self, extra_options=()):
87
"""Start a bzr server subprocess.
89
:param extra_options: extra options to give the server.
90
:return: a tuple with the bzr process handle for passing to
91
finish_bzr_subprocess, and the base url for the server.
93
# Serve from the current directory
94
args = ['serve', '--port', 'localhost:0']
95
args.extend(extra_options)
96
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
85
97
port_line = process.stdout.readline()
86
98
prefix = 'listening on port: '
87
99
self.assertStartsWith(port_line, prefix)
88
100
port = int(port_line[len(prefix):])
101
return process,'bzr://localhost:%d/' % port
103
def test_bzr_serve_inet_readonly(self):
104
"""bzr server should provide a read only filesystem by default."""
105
process, client, transport = self.start_server_inet()
106
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
107
# finish with the transport
109
self.assertInetServerShutsdownCleanly(client, process)
111
def test_bzr_serve_inet_readwrite(self):
113
self.make_branch('.')
115
process, client, transport = self.start_server_inet(['--allow-writes'])
117
# We get a working branch
118
branch = BzrDir.open_from_transport(transport).open_branch()
119
branch.repository.get_revision_graph()
120
self.assertEqual(None, branch.last_revision())
122
# finish with the transport
125
self.assertInetServerShutsdownCleanly(client, process)
127
def test_bzr_serve_port_readonly(self):
128
"""bzr server should provide a read only filesystem by default."""
129
process, url = self.start_server_port()
130
transport = get_transport(url)
131
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
132
self.assertServerFinishesCleanly(process)
134
def test_bzr_serve_port_readwrite(self):
136
self.make_branch('.')
138
process, url = self.start_server_port(['--allow-writes'])
90
140
# Connect to the server
91
branch = Branch.open('bzr://localhost:%d/' % port)
141
branch = Branch.open(url)
93
143
# We get a working branch
94
144
branch.repository.get_revision_graph()
95
145
self.assertEqual(None, branch.last_revision())
98
result = self.finish_bzr_subprocess(process, retcode=3,
99
send_signal=signal.SIGINT)
100
self.assertEqual('', result[0])
101
self.assertEqual('bzr: interrupted\n', result[1])
147
self.assertServerFinishesCleanly(process)
103
149
def test_bzr_serve_no_args(self):
104
150
"""'bzr serve' with no arguments or options should not traceback."""