1
# Copyright (C) 2010, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22
import SocketServer as socketserver
34
from breezy.transport import (
38
from breezy.bzr.smart import (
45
# FIXME: There is a dependency loop between breezy.tests and
46
# breezy.tests.test_server that needs to be fixed. In the mean time
47
# defining this function is enough for our needs. -- vila 20100611
48
from breezy import tests
49
return 'threads' in tests.selftest_debug_flags
52
class TestServer(transport.Server):
53
"""A Transport Server dedicated to tests.
55
The TestServer interface provides a server for a given transport. We use
56
these servers as loopback testing tools. For any given transport the
57
Servers it provides must either allow writing, or serve the contents
58
of osutils.getcwd() at the time start_server is called.
60
Note that these are real servers - they must implement all the things
61
that we want bzr transports to take advantage of.
65
"""Return a url for this server.
67
If the transport does not represent a disk directory (i.e. it is
68
a database like svn, or a memory only transport, it should return
69
a connection to a newly established resource for this Server.
70
Otherwise it should return a url that will provide access to the path
71
that was osutils.getcwd() when start_server() was called.
73
Subsequent calls will return the same resource.
75
raise NotImplementedError
77
def get_bogus_url(self):
78
"""Return a url for this protocol, that will fail to connect.
80
This may raise NotImplementedError to indicate that this server cannot
83
raise NotImplementedError
86
class LocalURLServer(TestServer):
87
"""A pretend server for local transports, using file:// urls.
89
Of course no actual server is required to access the local filesystem, so
90
this just exists to tell the test code how to get to it.
93
def start_server(self):
97
"""See Transport.Server.get_url."""
98
return urlutils.local_path_to_url('')
101
class DecoratorServer(TestServer):
102
"""Server for the TransportDecorator for testing with.
104
To use this when subclassing TransportDecorator, override override the
105
get_decorator_class method.
108
def start_server(self, server=None):
109
"""See breezy.transport.Server.start_server.
111
:server: decorate the urls given by server. If not provided a
112
LocalServer is created.
114
if server is not None:
115
self._made_server = False
116
self._server = server
118
self._made_server = True
119
self._server = LocalURLServer()
120
self._server.start_server()
122
def stop_server(self):
123
if self._made_server:
124
self._server.stop_server()
126
def get_decorator_class(self):
127
"""Return the class of the decorators we should be constructing."""
128
raise NotImplementedError(self.get_decorator_class)
130
def get_url_prefix(self):
131
"""What URL prefix does this decorator produce?"""
132
return self.get_decorator_class()._get_url_prefix()
134
def get_bogus_url(self):
135
"""See breezy.transport.Server.get_bogus_url."""
136
return self.get_url_prefix() + self._server.get_bogus_url()
139
"""See breezy.transport.Server.get_url."""
140
return self.get_url_prefix() + self._server.get_url()
143
class BrokenRenameServer(DecoratorServer):
144
"""Server for the BrokenRenameTransportDecorator for testing with."""
146
def get_decorator_class(self):
147
from breezy.transport import brokenrename
148
return brokenrename.BrokenRenameTransportDecorator
151
class FakeNFSServer(DecoratorServer):
152
"""Server for the FakeNFSTransportDecorator for testing with."""
154
def get_decorator_class(self):
155
from breezy.transport import fakenfs
156
return fakenfs.FakeNFSTransportDecorator
159
class FakeVFATServer(DecoratorServer):
160
"""A server that suggests connections through FakeVFATTransportDecorator
165
def get_decorator_class(self):
166
from breezy.transport import fakevfat
167
return fakevfat.FakeVFATTransportDecorator
170
class LogDecoratorServer(DecoratorServer):
171
"""Server for testing."""
173
def get_decorator_class(self):
174
from breezy.transport import log
175
return log.TransportLogDecorator
178
class NoSmartTransportServer(DecoratorServer):
179
"""Server for the NoSmartTransportDecorator for testing with."""
181
def get_decorator_class(self):
182
from breezy.transport import nosmart
183
return nosmart.NoSmartTransportDecorator
186
class ReadonlyServer(DecoratorServer):
187
"""Server for the ReadonlyTransportDecorator for testing with."""
189
def get_decorator_class(self):
190
from breezy.transport import readonly
191
return readonly.ReadonlyTransportDecorator
194
class TraceServer(DecoratorServer):
195
"""Server for the TransportTraceDecorator for testing with."""
197
def get_decorator_class(self):
198
from breezy.transport import trace
199
return trace.TransportTraceDecorator
202
class UnlistableServer(DecoratorServer):
203
"""Server for the UnlistableTransportDecorator for testing with."""
205
def get_decorator_class(self):
206
from breezy.transport import unlistable
207
return unlistable.UnlistableTransportDecorator
210
class TestingPathFilteringServer(pathfilter.PathFilteringServer):
213
"""TestingPathFilteringServer is not usable until start_server
216
def start_server(self, backing_server=None):
217
"""Setup the Chroot on backing_server."""
218
if backing_server is not None:
219
self.backing_transport = transport.get_transport_from_url(
220
backing_server.get_url())
222
self.backing_transport = transport.get_transport_from_path('.')
223
self.backing_transport.clone('added-by-filter').ensure_base()
224
self.filter_func = lambda x: 'added-by-filter/' + x
225
super(TestingPathFilteringServer, self).start_server()
227
def get_bogus_url(self):
228
raise NotImplementedError
231
class TestingChrootServer(chroot.ChrootServer):
234
"""TestingChrootServer is not usable until start_server is called."""
235
super(TestingChrootServer, self).__init__(None)
237
def start_server(self, backing_server=None):
238
"""Setup the Chroot on backing_server."""
239
if backing_server is not None:
240
self.backing_transport = transport.get_transport_from_url(
241
backing_server.get_url())
243
self.backing_transport = transport.get_transport_from_path('.')
244
super(TestingChrootServer, self).start_server()
246
def get_bogus_url(self):
247
raise NotImplementedError
250
class TestThread(cethread.CatchingExceptionThread):
252
if not getattr(cethread.CatchingExceptionThread, 'is_alive', None):
254
return self.isAlive()
256
def join(self, timeout=5):
257
"""Overrides to use a default timeout.
259
The default timeout is set to 5 and should expire only when a thread
260
serving a client connection is hung.
262
super(TestThread, self).join(timeout)
263
if timeout and self.is_alive():
264
# The timeout expired without joining the thread, the thread is
265
# therefore stucked and that's a failure as far as the test is
266
# concerned. We used to hang here.
268
# FIXME: we need to kill the thread, but as far as the test is
269
# concerned, raising an assertion is too strong. On most of the
270
# platforms, this doesn't occur, so just mentioning the problem is
271
# enough for now -- vila 2010824
272
sys.stderr.write('thread %s hung\n' % (self.name,))
273
# raise AssertionError('thread %s hung' % (self.name,))
276
class TestingTCPServerMixin(object):
277
"""Mixin to support running socketserver.TCPServer in a thread.
279
Tests are connecting from the main thread, the server has to be run in a
284
self.started = threading.Event()
286
self.stopped = threading.Event()
287
# We collect the resources used by the clients so we can release them
290
self.ignored_exceptions = None
292
def server_bind(self):
293
self.socket.bind(self.server_address)
294
self.server_address = self.socket.getsockname()
298
# We are listening and ready to accept connections
302
# Really a connection but the python framework is generic and
304
self.handle_request()
305
# Let's close the listening socket
310
def handle_request(self):
311
"""Handle one request.
313
The python version swallows some socket exceptions and we don't use
314
timeout, so we override it to better control the server behavior.
316
request, client_address = self.get_request()
317
if self.verify_request(request, client_address):
319
self.process_request(request, client_address)
320
except BaseException:
321
self.handle_error(request, client_address)
323
self.close_request(request)
325
def get_request(self):
326
return self.socket.accept()
328
def verify_request(self, request, client_address):
329
"""Verify the request.
331
Return True if we should proceed with this request, False if we should
332
not even touch a single byte in the socket ! This is useful when we
333
stop the server with a dummy last connection.
337
def handle_error(self, request, client_address):
338
# Stop serving and re-raise the last exception seen
340
# The following can be used for debugging purposes, it will display the
341
# exception and the traceback just when it occurs instead of waiting
342
# for the thread to be joined.
343
# socketserver.BaseServer.handle_error(self, request, client_address)
345
# We call close_request manually, because we are going to raise an
346
# exception. The socketserver implementation calls:
349
# But because we raise the exception, close_request will never be
350
# triggered. This helps client not block waiting for a response when
351
# the server gets an exception.
352
self.close_request(request)
355
def ignored_exceptions_during_shutdown(self, e):
356
if sys.platform == 'win32':
357
accepted_errnos = [errno.EBADF,
365
accepted_errnos = [errno.EBADF,
370
if isinstance(e, socket.error) and e.errno in accepted_errnos:
374
# The following methods are called by the main thread
376
def stop_client_connections(self):
378
c = self.clients.pop()
379
self.shutdown_client(c)
381
def shutdown_socket(self, sock):
382
"""Properly shutdown a socket.
384
This should be called only when no other thread is trying to use the
388
sock.shutdown(socket.SHUT_RDWR)
390
except Exception as e:
391
if self.ignored_exceptions(e):
396
# The following methods are called by the main thread
398
def set_ignored_exceptions(self, thread, ignored_exceptions):
399
self.ignored_exceptions = ignored_exceptions
400
thread.set_ignored_exceptions(self.ignored_exceptions)
402
def _pending_exception(self, thread):
403
"""Raise server uncaught exception.
405
Daughter classes can override this if they use daughter threads.
407
thread.pending_exception()
410
class TestingTCPServer(TestingTCPServerMixin, socketserver.TCPServer):
412
def __init__(self, server_address, request_handler_class):
413
TestingTCPServerMixin.__init__(self)
414
socketserver.TCPServer.__init__(self, server_address,
415
request_handler_class)
417
def get_request(self):
418
"""Get the request and client address from the socket."""
419
sock, addr = TestingTCPServerMixin.get_request(self)
420
self.clients.append((sock, addr))
423
# The following methods are called by the main thread
425
def shutdown_client(self, client):
427
self.shutdown_socket(sock)
430
class TestingThreadingTCPServer(TestingTCPServerMixin,
431
socketserver.ThreadingTCPServer):
433
def __init__(self, server_address, request_handler_class):
434
TestingTCPServerMixin.__init__(self)
435
socketserver.ThreadingTCPServer.__init__(self, server_address,
436
request_handler_class)
438
def get_request(self):
439
"""Get the request and client address from the socket."""
440
sock, addr = TestingTCPServerMixin.get_request(self)
441
# The thread is not created yet, it will be updated in process_request
442
self.clients.append((sock, addr, None))
445
def process_request_thread(self, started, detached, stopped,
446
request, client_address):
448
# We will be on our own once the server tells us we're detached
450
socketserver.ThreadingTCPServer.process_request_thread(
451
self, request, client_address)
452
self.close_request(request)
455
def process_request(self, request, client_address):
456
"""Start a new thread to process the request."""
457
started = threading.Event()
458
detached = threading.Event()
459
stopped = threading.Event()
462
name='%s -> %s' % (client_address, self.server_address),
463
target=self.process_request_thread,
464
args=(started, detached, stopped, request, client_address))
465
# Update the client description
467
self.clients.append((request, client_address, t))
468
# Propagate the exception handler since we must use the same one as
469
# TestingTCPServer for connections running in their own threads.
470
t.set_ignored_exceptions(self.ignored_exceptions)
473
# If an exception occured during the thread start, it will get raised.
474
t.pending_exception()
476
sys.stderr.write('Client thread %s started\n' % (t.name,))
477
# Tell the thread, it's now on its own for exception handling.
480
# The following methods are called by the main thread
482
def shutdown_client(self, client):
483
sock, addr, connection_thread = client
484
self.shutdown_socket(sock)
485
if connection_thread is not None:
486
# The thread has been created only if the request is processed but
487
# after the connection is inited. This could happen during server
488
# shutdown. If an exception occurred in the thread it will be
491
sys.stderr.write('Client thread %s will be joined\n'
492
% (connection_thread.name,))
493
connection_thread.join()
495
def set_ignored_exceptions(self, thread, ignored_exceptions):
496
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
498
for sock, addr, connection_thread in self.clients:
499
if connection_thread is not None:
500
connection_thread.set_ignored_exceptions(
501
self.ignored_exceptions)
503
def _pending_exception(self, thread):
504
for sock, addr, connection_thread in self.clients:
505
if connection_thread is not None:
506
connection_thread.pending_exception()
507
TestingTCPServerMixin._pending_exception(self, thread)
510
class TestingTCPServerInAThread(transport.Server):
511
"""A server in a thread that re-raise thread exceptions."""
513
def __init__(self, server_address, server_class, request_handler_class):
514
self.server_class = server_class
515
self.request_handler_class = request_handler_class
516
self.host, self.port = server_address
518
self._server_thread = None
521
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
523
def create_server(self):
524
return self.server_class((self.host, self.port),
525
self.request_handler_class)
527
def start_server(self):
528
self.server = self.create_server()
529
self._server_thread = TestThread(
530
sync_event=self.server.started,
531
target=self.run_server)
532
self._server_thread.start()
533
# Wait for the server thread to start (i.e. release the lock)
534
self.server.started.wait()
535
# Get the real address, especially the port
536
self.host, self.port = self.server.server_address
537
self._server_thread.name = self.server.server_address
539
sys.stderr.write('Server thread %s started\n'
540
% (self._server_thread.name,))
541
# If an exception occured during the server start, it will get raised,
542
# otherwise, the server is blocked on its accept() call.
543
self._server_thread.pending_exception()
544
# From now on, we'll use a different event to ensure the server can set
546
self._server_thread.set_sync_event(self.server.stopped)
548
def run_server(self):
551
def stop_server(self):
552
if self.server is None:
555
# The server has been started successfully, shut it down now. As
556
# soon as we stop serving, no more connection are accepted except
557
# one to get out of the blocking listen.
558
self.set_ignored_exceptions(
559
self.server.ignored_exceptions_during_shutdown)
560
self.server.serving = False
562
sys.stderr.write('Server thread %s will be joined\n'
563
% (self._server_thread.name,))
564
# The server is listening for a last connection, let's give it:
567
last_conn = osutils.connect_socket((self.host, self.port))
569
# But ignore connection errors as the point is to unblock the
570
# server thread, it may happen that it's not blocked or even
573
# We start shutting down the clients while the server itself is
575
self.server.stop_client_connections()
576
# Now we wait for the thread running self.server.serve() to finish
577
self.server.stopped.wait()
578
if last_conn is not None:
579
# Close the last connection without trying to use it. The
580
# server will not process a single byte on that socket to avoid
581
# complications (SSL starts with a handshake for example).
583
# Check for any exception that could have occurred in the server
586
self._server_thread.join()
587
except Exception as e:
588
if self.server.ignored_exceptions(e):
593
# Make sure we can be called twice safely, note that this means
594
# that we will raise a single exception even if several occurred in
595
# the various threads involved.
598
def set_ignored_exceptions(self, ignored_exceptions):
599
"""Install an exception handler for the server."""
600
self.server.set_ignored_exceptions(self._server_thread,
603
def pending_exception(self):
604
"""Raise uncaught exception in the server."""
605
self.server._pending_exception(self._server_thread)
608
class TestingSmartConnectionHandler(socketserver.BaseRequestHandler,
609
medium.SmartServerSocketStreamMedium):
611
def __init__(self, request, client_address, server):
612
medium.SmartServerSocketStreamMedium.__init__(
613
self, request, server.backing_transport,
614
server.root_client_path,
615
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
616
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
617
socketserver.BaseRequestHandler.__init__(self, request, client_address,
622
while not self.finished:
623
server_protocol = self._build_protocol()
624
self._serve_one_request(server_protocol)
625
except errors.ConnectionTimeout:
626
# idle connections aren't considered a failure of the server
630
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
633
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
635
def __init__(self, server_address, request_handler_class,
636
backing_transport, root_client_path):
637
TestingThreadingTCPServer.__init__(self, server_address,
638
request_handler_class)
639
server.SmartTCPServer.__init__(
640
self, backing_transport,
641
root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
644
self.run_server_started_hooks()
646
TestingThreadingTCPServer.serve(self)
648
self.run_server_stopped_hooks()
651
"""Return the url of the server"""
652
return "bzr://%s:%d/" % self.server_address
655
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
656
"""Server suitable for use by transport tests.
658
This server is backed by the process's cwd.
661
def __init__(self, thread_name_suffix=''):
662
self.client_path_extra = None
663
self.thread_name_suffix = thread_name_suffix
664
self.host = '127.0.0.1'
666
super(SmartTCPServer_for_testing, self).__init__(
667
(self.host, self.port),
669
TestingSmartConnectionHandler)
671
def create_server(self):
672
return self.server_class((self.host, self.port),
673
self.request_handler_class,
674
self.backing_transport,
675
self.root_client_path)
677
def start_server(self, backing_transport_server=None,
678
client_path_extra='/extra/'):
679
"""Set up server for testing.
681
:param backing_transport_server: backing server to use. If not
682
specified, a LocalURLServer at the current working directory will
684
:param client_path_extra: a path segment starting with '/' to append to
685
the root URL for this server. For instance, a value of '/foo/bar/'
686
will mean the root of the backing transport will be published at a
687
URL like `bzr://127.0.0.1:nnnn/foo/bar/`, rather than
688
`bzr://127.0.0.1:nnnn/`. Default value is `extra`, so that tests
689
by default will fail unless they do the necessary path translation.
691
if not client_path_extra.startswith('/'):
692
raise ValueError(client_path_extra)
693
self.root_client_path = self.client_path_extra = client_path_extra
694
from breezy.transport.chroot import ChrootServer
695
if backing_transport_server is None:
696
backing_transport_server = LocalURLServer()
697
self.chroot_server = ChrootServer(
698
self.get_backing_transport(backing_transport_server))
699
self.chroot_server.start_server()
700
self.backing_transport = transport.get_transport_from_url(
701
self.chroot_server.get_url())
702
super(SmartTCPServer_for_testing, self).start_server()
704
def stop_server(self):
706
super(SmartTCPServer_for_testing, self).stop_server()
708
self.chroot_server.stop_server()
710
def get_backing_transport(self, backing_transport_server):
711
"""Get a backing transport from a server we are decorating."""
712
return transport.get_transport_from_url(
713
backing_transport_server.get_url())
716
url = self.server.get_url()
717
return url[:-1] + self.client_path_extra
719
def get_bogus_url(self):
720
"""Return a URL which will fail to connect"""
721
return 'bzr://127.0.0.1:1/'
724
class ReadonlySmartTCPServer_for_testing(SmartTCPServer_for_testing):
725
"""Get a readonly server for testing."""
727
def get_backing_transport(self, backing_transport_server):
728
"""Get a backing transport from a server we are decorating."""
729
url = 'readonly+' + backing_transport_server.get_url()
730
return transport.get_transport_from_url(url)
733
class SmartTCPServer_for_testing_v2_only(SmartTCPServer_for_testing):
734
"""A variation of SmartTCPServer_for_testing that limits the client to
735
using RPCs in protocol v2 (i.e. bzr <= 1.5).
739
url = super(SmartTCPServer_for_testing_v2_only, self).get_url()
740
url = 'bzr-v2://' + url[len('bzr://'):]
744
class ReadonlySmartTCPServer_for_testing_v2_only(
745
SmartTCPServer_for_testing_v2_only):
746
"""Get a readonly server for testing."""
748
def get_backing_transport(self, backing_transport_server):
749
"""Get a backing transport from a server we are decorating."""
750
url = 'readonly+' + backing_transport_server.get_url()
751
return transport.get_transport_from_url(url)