1
# Copyright (C) 2010, 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
29
from bzrlib.transport import (
33
from bzrlib.smart import (
40
# FIXME: There is a dependency loop between bzrlib.tests and
41
# bzrlib.tests.test_server that needs to be fixed. In the mean time
42
# defining this function is enough for our needs. -- vila 20100611
43
from bzrlib import tests
44
return 'threads' in tests.selftest_debug_flags
47
class TestServer(transport.Server):
48
"""A Transport Server dedicated to tests.
50
The TestServer interface provides a server for a given transport. We use
51
these servers as loopback testing tools. For any given transport the
52
Servers it provides must either allow writing, or serve the contents
53
of os.getcwdu() at the time start_server is called.
55
Note that these are real servers - they must implement all the things
56
that we want bzr transports to take advantage of.
60
"""Return a url for this server.
62
If the transport does not represent a disk directory (i.e. it is
63
a database like svn, or a memory only transport, it should return
64
a connection to a newly established resource for this Server.
65
Otherwise it should return a url that will provide access to the path
66
that was os.getcwdu() when start_server() was called.
68
Subsequent calls will return the same resource.
70
raise NotImplementedError
72
def get_bogus_url(self):
73
"""Return a url for this protocol, that will fail to connect.
75
This may raise NotImplementedError to indicate that this server cannot
78
raise NotImplementedError
81
class LocalURLServer(TestServer):
82
"""A pretend server for local transports, using file:// urls.
84
Of course no actual server is required to access the local filesystem, so
85
this just exists to tell the test code how to get to it.
88
def start_server(self):
92
"""See Transport.Server.get_url."""
93
return urlutils.local_path_to_url('')
96
class DecoratorServer(TestServer):
97
"""Server for the TransportDecorator for testing with.
99
To use this when subclassing TransportDecorator, override override the
100
get_decorator_class method.
103
def start_server(self, server=None):
104
"""See bzrlib.transport.Server.start_server.
106
:server: decorate the urls given by server. If not provided a
107
LocalServer is created.
109
if server is not None:
110
self._made_server = False
111
self._server = server
113
self._made_server = True
114
self._server = LocalURLServer()
115
self._server.start_server()
117
def stop_server(self):
118
if self._made_server:
119
self._server.stop_server()
121
def get_decorator_class(self):
122
"""Return the class of the decorators we should be constructing."""
123
raise NotImplementedError(self.get_decorator_class)
125
def get_url_prefix(self):
126
"""What URL prefix does this decorator produce?"""
127
return self.get_decorator_class()._get_url_prefix()
129
def get_bogus_url(self):
130
"""See bzrlib.transport.Server.get_bogus_url."""
131
return self.get_url_prefix() + self._server.get_bogus_url()
134
"""See bzrlib.transport.Server.get_url."""
135
return self.get_url_prefix() + self._server.get_url()
138
class BrokenRenameServer(DecoratorServer):
139
"""Server for the BrokenRenameTransportDecorator for testing with."""
141
def get_decorator_class(self):
142
from bzrlib.transport import brokenrename
143
return brokenrename.BrokenRenameTransportDecorator
146
class FakeNFSServer(DecoratorServer):
147
"""Server for the FakeNFSTransportDecorator for testing with."""
149
def get_decorator_class(self):
150
from bzrlib.transport import fakenfs
151
return fakenfs.FakeNFSTransportDecorator
154
class FakeVFATServer(DecoratorServer):
155
"""A server that suggests connections through FakeVFATTransportDecorator
160
def get_decorator_class(self):
161
from bzrlib.transport import fakevfat
162
return fakevfat.FakeVFATTransportDecorator
165
class LogDecoratorServer(DecoratorServer):
166
"""Server for testing."""
168
def get_decorator_class(self):
169
from bzrlib.transport import log
170
return log.TransportLogDecorator
173
class NoSmartTransportServer(DecoratorServer):
174
"""Server for the NoSmartTransportDecorator for testing with."""
176
def get_decorator_class(self):
177
from bzrlib.transport import nosmart
178
return nosmart.NoSmartTransportDecorator
181
class ReadonlyServer(DecoratorServer):
182
"""Server for the ReadonlyTransportDecorator for testing with."""
184
def get_decorator_class(self):
185
from bzrlib.transport import readonly
186
return readonly.ReadonlyTransportDecorator
189
class TraceServer(DecoratorServer):
190
"""Server for the TransportTraceDecorator for testing with."""
192
def get_decorator_class(self):
193
from bzrlib.transport import trace
194
return trace.TransportTraceDecorator
197
class UnlistableServer(DecoratorServer):
198
"""Server for the UnlistableTransportDecorator for testing with."""
200
def get_decorator_class(self):
201
from bzrlib.transport import unlistable
202
return unlistable.UnlistableTransportDecorator
205
class TestingPathFilteringServer(pathfilter.PathFilteringServer):
208
"""TestingPathFilteringServer is not usable until start_server
211
def start_server(self, backing_server=None):
212
"""Setup the Chroot on backing_server."""
213
if backing_server is not None:
214
self.backing_transport = transport.get_transport(
215
backing_server.get_url())
217
self.backing_transport = transport.get_transport('.')
218
self.backing_transport.clone('added-by-filter').ensure_base()
219
self.filter_func = lambda x: 'added-by-filter/' + x
220
super(TestingPathFilteringServer, self).start_server()
222
def get_bogus_url(self):
223
raise NotImplementedError
226
class TestingChrootServer(chroot.ChrootServer):
229
"""TestingChrootServer is not usable until start_server is called."""
230
super(TestingChrootServer, self).__init__(None)
232
def start_server(self, backing_server=None):
233
"""Setup the Chroot on backing_server."""
234
if backing_server is not None:
235
self.backing_transport = transport.get_transport(
236
backing_server.get_url())
238
self.backing_transport = transport.get_transport('.')
239
super(TestingChrootServer, self).start_server()
241
def get_bogus_url(self):
242
raise NotImplementedError
245
class ThreadWithException(threading.Thread):
246
"""A catching exception thread.
248
If an exception occurs during the thread execution, it's caught and
249
re-raised when the thread is joined().
252
def __init__(self, *args, **kwargs):
253
# There are cases where the calling thread must wait, yet, if an
254
# exception occurs, the event should be set so the caller is not
255
# blocked. The main example is a calling thread that want to wait for
256
# the called thread to be in a given state before continuing.
258
event = kwargs.pop('event')
260
# If the caller didn't pass a specific event, create our own
261
event = threading.Event()
262
super(ThreadWithException, self).__init__(*args, **kwargs)
263
self.set_ready_event(event)
264
self.exception = None
265
self.ignored_exceptions = None # see set_ignored_exceptions
267
# compatibility thunk for python-2.4 and python-2.5...
268
if sys.version_info < (2, 6):
269
name = property(threading.Thread.getName, threading.Thread.setName)
271
def set_ready_event(self, event):
272
"""Set the ``ready`` event used to synchronize exception catching.
274
When the thread uses an event to synchronize itself with another thread
275
(setting it when the other thread can wake up from a ``wait`` call),
276
the event must be set after catching an exception or the other thread
279
Some threads require multiple events and should set the relevant one
284
def set_ignored_exceptions(self, ignored):
285
"""Declare which exceptions will be ignored.
287
:param ignored: Can be either:
288
- None: all exceptions will be raised,
289
- an exception class: the instances of this class will be ignored,
290
- a tuple of exception classes: the instances of any class of the
291
list will be ignored,
292
- a callable: that will be passed the exception object
293
and should return True if the exception should be ignored
296
self.ignored_exceptions = None
297
elif isinstance(ignored, (Exception, tuple)):
298
self.ignored_exceptions = lambda e: isinstance(e, ignored)
300
self.ignored_exceptions = ignored
303
"""Overrides Thread.run to capture any exception."""
307
super(ThreadWithException, self).run()
309
self.exception = sys.exc_info()
311
# Make sure the calling thread is released
315
def join(self, timeout=5):
316
"""Overrides Thread.join to raise any exception caught.
319
Calling join(timeout=0) will raise the caught exception or return None
320
if the thread is still alive.
322
The default timeout is set to 5 and should expire only when a thread
323
serving a client connection is hung.
325
super(ThreadWithException, self).join(timeout)
326
if self.exception is not None:
327
exc_class, exc_value, exc_tb = self.exception
328
self.exception = None # The exception should be raised only once
329
if (self.ignored_exceptions is None
330
or not self.ignored_exceptions(exc_value)):
331
# Raise non ignored exceptions
332
raise exc_class, exc_value, exc_tb
333
if timeout and self.isAlive():
334
# The timeout expired without joining the thread, the thread is
335
# therefore stucked and that's a failure as far as the test is
336
# concerned. We used to hang here.
338
# FIXME: we need to kill the thread, but as far as the test is
339
# concerned, raising an assertion is too strong. On most of the
340
# platforms, this doesn't occur, so just mentioning the problem is
341
# enough for now -- vila 2010824
342
sys.stderr.write('thread %s hung\n' % (self.name,))
343
#raise AssertionError('thread %s hung' % (self.name,))
345
def pending_exception(self):
346
"""Raise the caught exception.
348
This does nothing if no exception occurred.
353
class TestingTCPServerMixin:
354
"""Mixin to support running SocketServer.TCPServer in a thread.
356
Tests are connecting from the main thread, the server has to be run in a
361
self.started = threading.Event()
363
self.stopped = threading.Event()
364
# We collect the resources used by the clients so we can release them
367
self.ignored_exceptions = None
369
def server_bind(self):
370
self.socket.bind(self.server_address)
371
self.server_address = self.socket.getsockname()
376
# We are listening and ready to accept connections
380
# Really a connection but the python framework is generic and
382
self.handle_request()
383
# Let's close the listening socket
388
def handle_request(self):
389
"""Handle one request.
391
The python version swallows some socket exceptions and we don't use
392
timeout, so we override it to better control the server behavior.
394
request, client_address = self.get_request()
395
if self.verify_request(request, client_address):
397
self.process_request(request, client_address)
399
self.handle_error(request, client_address)
400
self.close_request(request)
402
def get_request(self):
403
return self.socket.accept()
405
def verify_request(self, request, client_address):
406
"""Verify the request.
408
Return True if we should proceed with this request, False if we should
409
not even touch a single byte in the socket ! This is useful when we
410
stop the server with a dummy last connection.
414
def handle_error(self, request, client_address):
415
# Stop serving and re-raise the last exception seen
417
# The following can be used for debugging purposes, it will display the
418
# exception and the traceback just when it occurs instead of waiting
419
# for the thread to be joined.
421
# SocketServer.BaseServer.handle_error(self, request, client_address)
424
def ignored_exceptions_during_shutdown(self, e):
425
if sys.platform == 'win32':
426
accepted_errnos = [errno.EBADF,
434
accepted_errnos = [errno.EBADF,
439
if isinstance(e, socket.error) and e[0] in accepted_errnos:
443
# The following methods are called by the main thread
445
def stop_client_connections(self):
447
c = self.clients.pop()
448
self.shutdown_client(c)
450
def shutdown_socket(self, sock):
451
"""Properly shutdown a socket.
453
This should be called only when no other thread is trying to use the
457
sock.shutdown(socket.SHUT_RDWR)
460
if self.ignored_exceptions(e):
465
# The following methods are called by the main thread
467
def set_ignored_exceptions(self, thread, ignored_exceptions):
468
self.ignored_exceptions = ignored_exceptions
469
thread.set_ignored_exceptions(self.ignored_exceptions)
471
def _pending_exception(self, thread):
472
"""Raise server uncaught exception.
474
Daughter classes can override this if they use daughter threads.
476
thread.pending_exception()
479
class TestingTCPServer(TestingTCPServerMixin, SocketServer.TCPServer):
481
def __init__(self, server_address, request_handler_class):
482
TestingTCPServerMixin.__init__(self)
483
SocketServer.TCPServer.__init__(self, server_address,
484
request_handler_class)
486
def get_request(self):
487
"""Get the request and client address from the socket."""
488
sock, addr = TestingTCPServerMixin.get_request(self)
489
self.clients.append((sock, addr))
492
# The following methods are called by the main thread
494
def shutdown_client(self, client):
496
self.shutdown_socket(sock)
499
class TestingThreadingTCPServer(TestingTCPServerMixin,
500
SocketServer.ThreadingTCPServer):
502
def __init__(self, server_address, request_handler_class):
503
TestingTCPServerMixin.__init__(self)
504
SocketServer.ThreadingTCPServer.__init__(self, server_address,
505
request_handler_class)
507
def get_request (self):
508
"""Get the request and client address from the socket."""
509
sock, addr = TestingTCPServerMixin.get_request(self)
510
# The thread is not create yet, it will be updated in process_request
511
self.clients.append((sock, addr, None))
514
def process_request_thread(self, started, stopped, request, client_address):
516
SocketServer.ThreadingTCPServer.process_request_thread(
517
self, request, client_address)
518
self.close_request(request)
521
def process_request(self, request, client_address):
522
"""Start a new thread to process the request."""
523
started = threading.Event()
524
stopped = threading.Event()
525
t = ThreadWithException(
527
name='%s -> %s' % (client_address, self.server_address),
528
target = self.process_request_thread,
529
args = (started, stopped, request, client_address))
530
# Update the client description
532
self.clients.append((request, client_address, t))
533
# Propagate the exception handler since we must use the same one as
534
# TestingTCPServer for connections running in their own threads.
535
t.set_ignored_exceptions(self.ignored_exceptions)
539
sys.stderr.write('Client thread %s started\n' % (t.name,))
540
# If an exception occured during the thread start, it will get raised.
541
t.pending_exception()
543
# The following methods are called by the main thread
545
def shutdown_client(self, client):
546
sock, addr, connection_thread = client
547
self.shutdown_socket(sock)
548
if connection_thread is not None:
549
# The thread has been created only if the request is processed but
550
# after the connection is inited. This could happen during server
551
# shutdown. If an exception occurred in the thread it will be
554
sys.stderr.write('Client thread %s will be joined\n'
555
% (connection_thread.name,))
556
connection_thread.join()
558
def set_ignored_exceptions(self, thread, ignored_exceptions):
559
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
561
for sock, addr, connection_thread in self.clients:
562
if connection_thread is not None:
563
connection_thread.set_ignored_exceptions(
564
self.ignored_exceptions)
566
def _pending_exception(self, thread):
567
for sock, addr, connection_thread in self.clients:
568
if connection_thread is not None:
569
connection_thread.pending_exception()
570
TestingTCPServerMixin._pending_exception(self, thread)
573
class TestingTCPServerInAThread(transport.Server):
574
"""A server in a thread that re-raise thread exceptions."""
576
def __init__(self, server_address, server_class, request_handler_class):
577
self.server_class = server_class
578
self.request_handler_class = request_handler_class
579
self.host, self.port = server_address
581
self._server_thread = None
584
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
586
def create_server(self):
587
return self.server_class((self.host, self.port),
588
self.request_handler_class)
590
def start_server(self):
591
self.server = self.create_server()
592
self._server_thread = ThreadWithException(
593
event=self.server.started,
594
target=self.run_server)
595
self._server_thread.start()
596
# Wait for the server thread to start (i.e release the lock)
597
self.server.started.wait()
598
# Get the real address, especially the port
599
self.host, self.port = self.server.server_address
600
self._server_thread.name = self.server.server_address
602
sys.stderr.write('Server thread %s started\n'
603
% (self._server_thread.name,))
604
# If an exception occured during the server start, it will get raised,
605
# otherwise, the server is blocked on its accept() call.
606
self._server_thread.pending_exception()
607
# From now on, we'll use a different event to ensure the server can set
609
self._server_thread.set_ready_event(self.server.stopped)
611
def run_server(self):
614
def stop_server(self):
615
if self.server is None:
618
# The server has been started successfully, shut it down now. As
619
# soon as we stop serving, no more connection are accepted except
620
# one to get out of the blocking listen.
621
self.set_ignored_exceptions(
622
self.server.ignored_exceptions_during_shutdown)
623
self.server.serving = False
625
sys.stderr.write('Server thread %s will be joined\n'
626
% (self._server_thread.name,))
627
# The server is listening for a last connection, let's give it:
630
last_conn = osutils.connect_socket((self.host, self.port))
631
except socket.error, e:
632
# But ignore connection errors as the point is to unblock the
633
# server thread, it may happen that it's not blocked or even
636
# We start shutting down the clients while the server itself is
638
self.server.stop_client_connections()
639
# Now we wait for the thread running self.server.serve() to finish
640
self.server.stopped.wait()
641
if last_conn is not None:
642
# Close the last connection without trying to use it. The
643
# server will not process a single byte on that socket to avoid
644
# complications (SSL starts with a handshake for example).
646
# Check for any exception that could have occurred in the server
649
self._server_thread.join()
651
if self.server.ignored_exceptions(e):
656
# Make sure we can be called twice safely, note that this means
657
# that we will raise a single exception even if several occurred in
658
# the various threads involved.
661
def set_ignored_exceptions(self, ignored_exceptions):
662
"""Install an exception handler for the server."""
663
self.server.set_ignored_exceptions(self._server_thread,
666
def pending_exception(self):
667
"""Raise uncaught exception in the server."""
668
self.server._pending_exception(self._server_thread)
671
class TestingSmartConnectionHandler(SocketServer.BaseRequestHandler,
672
medium.SmartServerSocketStreamMedium):
674
def __init__(self, request, client_address, server):
675
medium.SmartServerSocketStreamMedium.__init__(
676
self, request, server.backing_transport,
677
server.root_client_path)
678
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
679
SocketServer.BaseRequestHandler.__init__(self, request, client_address,
683
while not self.finished:
684
server_protocol = self._build_protocol()
685
self._serve_one_request(server_protocol)
688
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
690
def __init__(self, server_address, request_handler_class,
691
backing_transport, root_client_path):
692
TestingThreadingTCPServer.__init__(self, server_address,
693
request_handler_class)
694
server.SmartTCPServer.__init__(self, backing_transport,
697
# FIXME: No test are exercising the hooks for the test server
699
self.run_server_started_hooks()
701
TestingThreadingTCPServer.serve(self)
703
self.run_server_stopped_hooks()
706
"""Return the url of the server"""
707
return "bzr://%s:%d/" % self.server_address
710
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
711
"""Server suitable for use by transport tests.
713
This server is backed by the process's cwd.
715
def __init__(self, thread_name_suffix=''):
716
self.client_path_extra = None
717
self.thread_name_suffix = thread_name_suffix
718
self.host = '127.0.0.1'
720
super(SmartTCPServer_for_testing, self).__init__(
721
(self.host, self.port),
723
TestingSmartConnectionHandler)
725
def create_server(self):
726
return self.server_class((self.host, self.port),
727
self.request_handler_class,
728
self.backing_transport,
729
self.root_client_path)
732
def start_server(self, backing_transport_server=None,
733
client_path_extra='/extra/'):
734
"""Set up server for testing.
736
:param backing_transport_server: backing server to use. If not
737
specified, a LocalURLServer at the current working directory will
739
:param client_path_extra: a path segment starting with '/' to append to
740
the root URL for this server. For instance, a value of '/foo/bar/'
741
will mean the root of the backing transport will be published at a
742
URL like `bzr://127.0.0.1:nnnn/foo/bar/`, rather than
743
`bzr://127.0.0.1:nnnn/`. Default value is `extra`, so that tests
744
by default will fail unless they do the necessary path translation.
746
if not client_path_extra.startswith('/'):
747
raise ValueError(client_path_extra)
748
self.root_client_path = self.client_path_extra = client_path_extra
749
from bzrlib.transport.chroot import ChrootServer
750
if backing_transport_server is None:
751
backing_transport_server = LocalURLServer()
752
self.chroot_server = ChrootServer(
753
self.get_backing_transport(backing_transport_server))
754
self.chroot_server.start_server()
755
self.backing_transport = transport.get_transport(
756
self.chroot_server.get_url())
757
super(SmartTCPServer_for_testing, self).start_server()
759
def stop_server(self):
761
super(SmartTCPServer_for_testing, self).stop_server()
763
self.chroot_server.stop_server()
765
def get_backing_transport(self, backing_transport_server):
766
"""Get a backing transport from a server we are decorating."""
767
return transport.get_transport(backing_transport_server.get_url())
770
url = self.server.get_url()
771
return url[:-1] + self.client_path_extra
773
def get_bogus_url(self):
774
"""Return a URL which will fail to connect"""
775
return 'bzr://127.0.0.1:1/'
778
class ReadonlySmartTCPServer_for_testing(SmartTCPServer_for_testing):
779
"""Get a readonly server for testing."""
781
def get_backing_transport(self, backing_transport_server):
782
"""Get a backing transport from a server we are decorating."""
783
url = 'readonly+' + backing_transport_server.get_url()
784
return transport.get_transport(url)
787
class SmartTCPServer_for_testing_v2_only(SmartTCPServer_for_testing):
788
"""A variation of SmartTCPServer_for_testing that limits the client to
789
using RPCs in protocol v2 (i.e. bzr <= 1.5).
793
url = super(SmartTCPServer_for_testing_v2_only, self).get_url()
794
url = 'bzr-v2://' + url[len('bzr://'):]
798
class ReadonlySmartTCPServer_for_testing_v2_only(
799
SmartTCPServer_for_testing_v2_only):
800
"""Get a readonly server for testing."""
802
def get_backing_transport(self, backing_transport_server):
803
"""Get a backing transport from a server we are decorating."""
804
url = 'readonly+' + backing_transport_server.get_url()
805
return transport.get_transport(url)