213
234
def start_server(self, backing_server=None):
214
235
"""Setup the Chroot on backing_server."""
215
236
if backing_server is not None:
216
self.backing_transport = transport.get_transport(
237
self.backing_transport = transport.get_transport_from_url(
217
238
backing_server.get_url())
219
self.backing_transport = transport.get_transport('.')
240
self.backing_transport = transport.get_transport_from_path('.')
220
241
super(TestingChrootServer, self).start_server()
222
243
def get_bogus_url(self):
223
244
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
247
class TestThread(cethread.CatchingExceptionThread):
249
def join(self, timeout=5):
250
"""Overrides to use a default timeout.
252
The default timeout is set to 5 and should expire only when a thread
253
serving a client connection is hung.
255
super(TestThread, self).join(timeout)
256
if timeout and self.is_alive():
257
# The timeout expired without joining the thread, the thread is
258
# therefore stucked and that's a failure as far as the test is
259
# concerned. We used to hang here.
261
# FIXME: we need to kill the thread, but as far as the test is
262
# concerned, raising an assertion is too strong. On most of the
263
# platforms, this doesn't occur, so just mentioning the problem is
264
# enough for now -- vila 2010824
265
sys.stderr.write('thread %s hung\n' % (self.name,))
266
# raise AssertionError('thread %s hung' % (self.name,))
269
class TestingTCPServerMixin(object):
270
"""Mixin to support running socketserver.TCPServer in a thread.
272
Tests are connecting from the main thread, the server has to be run in a
277
self.started = threading.Event()
279
self.stopped = threading.Event()
280
# We collect the resources used by the clients so we can release them
283
self.ignored_exceptions = None
285
def server_bind(self):
286
self.socket.bind(self.server_address)
287
self.server_address = self.socket.getsockname()
291
# We are listening and ready to accept connections
295
# Really a connection but the python framework is generic and
297
self.handle_request()
298
# Let's close the listening socket
303
def handle_request(self):
304
"""Handle one request.
306
The python version swallows some socket exceptions and we don't use
307
timeout, so we override it to better control the server behavior.
309
request, client_address = self.get_request()
310
if self.verify_request(request, client_address):
312
self.process_request(request, client_address)
313
except BaseException:
314
self.handle_error(request, client_address)
316
self.close_request(request)
318
def get_request(self):
319
return self.socket.accept()
321
def verify_request(self, request, client_address):
322
"""Verify the request.
324
Return True if we should proceed with this request, False if we should
325
not even touch a single byte in the socket ! This is useful when we
326
stop the server with a dummy last connection.
330
def handle_error(self, request, client_address):
331
# Stop serving and re-raise the last exception seen
333
# The following can be used for debugging purposes, it will display the
334
# exception and the traceback just when it occurs instead of waiting
335
# for the thread to be joined.
336
# socketserver.BaseServer.handle_error(self, request, client_address)
338
# We call close_request manually, because we are going to raise an
339
# exception. The socketserver implementation calls:
342
# But because we raise the exception, close_request will never be
343
# triggered. This helps client not block waiting for a response when
344
# the server gets an exception.
345
self.close_request(request)
348
def ignored_exceptions_during_shutdown(self, e):
349
if sys.platform == 'win32':
350
accepted_errnos = [errno.EBADF,
358
accepted_errnos = [errno.EBADF,
363
if isinstance(e, socket.error) and e.errno in accepted_errnos:
367
# The following methods are called by the main thread
369
def stop_client_connections(self):
371
c = self.clients.pop()
372
self.shutdown_client(c)
374
def shutdown_socket(self, sock):
375
"""Properly shutdown a socket.
377
This should be called only when no other thread is trying to use the
381
sock.shutdown(socket.SHUT_RDWR)
383
except Exception as e:
384
if self.ignored_exceptions(e):
389
# The following methods are called by the main thread
391
def set_ignored_exceptions(self, thread, ignored_exceptions):
392
self.ignored_exceptions = ignored_exceptions
393
thread.set_ignored_exceptions(self.ignored_exceptions)
395
def _pending_exception(self, thread):
396
"""Raise server uncaught exception.
398
Daughter classes can override this if they use daughter threads.
400
thread.pending_exception()
403
class TestingTCPServer(TestingTCPServerMixin, socketserver.TCPServer):
405
def __init__(self, server_address, request_handler_class):
406
TestingTCPServerMixin.__init__(self)
407
socketserver.TCPServer.__init__(self, server_address,
408
request_handler_class)
410
def get_request(self):
411
"""Get the request and client address from the socket."""
412
sock, addr = TestingTCPServerMixin.get_request(self)
413
self.clients.append((sock, addr))
416
# The following methods are called by the main thread
418
def shutdown_client(self, client):
420
self.shutdown_socket(sock)
423
class TestingThreadingTCPServer(TestingTCPServerMixin,
424
socketserver.ThreadingTCPServer):
426
def __init__(self, server_address, request_handler_class):
427
TestingTCPServerMixin.__init__(self)
428
socketserver.ThreadingTCPServer.__init__(self, server_address,
429
request_handler_class)
431
def get_request(self):
432
"""Get the request and client address from the socket."""
433
sock, addr = TestingTCPServerMixin.get_request(self)
434
# The thread is not created yet, it will be updated in process_request
435
self.clients.append((sock, addr, None))
438
def process_request_thread(self, started, detached, stopped,
439
request, client_address):
441
# We will be on our own once the server tells us we're detached
443
socketserver.ThreadingTCPServer.process_request_thread(
444
self, request, client_address)
445
self.close_request(request)
448
def process_request(self, request, client_address):
449
"""Start a new thread to process the request."""
450
started = threading.Event()
451
detached = threading.Event()
452
stopped = threading.Event()
455
name='%s -> %s' % (client_address, self.server_address),
456
target=self.process_request_thread,
457
args=(started, detached, stopped, request, client_address))
458
# Update the client description
460
self.clients.append((request, client_address, t))
461
# Propagate the exception handler since we must use the same one as
462
# TestingTCPServer for connections running in their own threads.
463
t.set_ignored_exceptions(self.ignored_exceptions)
466
# If an exception occured during the thread start, it will get raised.
467
t.pending_exception()
469
sys.stderr.write('Client thread %s started\n' % (t.name,))
470
# Tell the thread, it's now on its own for exception handling.
473
# The following methods are called by the main thread
475
def shutdown_client(self, client):
476
sock, addr, connection_thread = client
477
self.shutdown_socket(sock)
478
if connection_thread is not None:
479
# The thread has been created only if the request is processed but
480
# after the connection is inited. This could happen during server
481
# shutdown. If an exception occurred in the thread it will be
484
sys.stderr.write('Client thread %s will be joined\n'
485
% (connection_thread.name,))
486
connection_thread.join()
488
def set_ignored_exceptions(self, thread, ignored_exceptions):
489
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
491
for sock, addr, connection_thread in self.clients:
492
if connection_thread is not None:
493
connection_thread.set_ignored_exceptions(
494
self.ignored_exceptions)
496
def _pending_exception(self, thread):
497
for sock, addr, connection_thread in self.clients:
498
if connection_thread is not None:
499
connection_thread.pending_exception()
500
TestingTCPServerMixin._pending_exception(self, thread)
503
class TestingTCPServerInAThread(transport.Server):
504
"""A server in a thread that re-raise thread exceptions."""
506
def __init__(self, server_address, server_class, request_handler_class):
507
self.server_class = server_class
508
self.request_handler_class = request_handler_class
509
self.host, self.port = server_address
511
self._server_thread = None
514
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
516
def create_server(self):
517
return self.server_class((self.host, self.port),
518
self.request_handler_class)
520
def start_server(self):
521
self.server = self.create_server()
522
self._server_thread = TestThread(
523
sync_event=self.server.started,
524
target=self.run_server)
525
self._server_thread.start()
526
# Wait for the server thread to start (i.e. release the lock)
527
self.server.started.wait()
528
# Get the real address, especially the port
529
self.host, self.port = self.server.server_address
530
self._server_thread.name = self.server.server_address
532
sys.stderr.write('Server thread %s started\n'
533
% (self._server_thread.name,))
534
# If an exception occured during the server start, it will get raised,
535
# otherwise, the server is blocked on its accept() call.
536
self._server_thread.pending_exception()
537
# From now on, we'll use a different event to ensure the server can set
539
self._server_thread.set_sync_event(self.server.stopped)
541
def run_server(self):
544
def stop_server(self):
545
if self.server is None:
548
# The server has been started successfully, shut it down now. As
549
# soon as we stop serving, no more connection are accepted except
550
# one to get out of the blocking listen.
551
self.set_ignored_exceptions(
552
self.server.ignored_exceptions_during_shutdown)
553
self.server.serving = False
555
sys.stderr.write('Server thread %s will be joined\n'
556
% (self._server_thread.name,))
557
# The server is listening for a last connection, let's give it:
560
last_conn = osutils.connect_socket((self.host, self.port))
562
# But ignore connection errors as the point is to unblock the
563
# server thread, it may happen that it's not blocked or even
566
# We start shutting down the clients while the server itself is
568
self.server.stop_client_connections()
569
# Now we wait for the thread running self.server.serve() to finish
570
self.server.stopped.wait()
571
if last_conn is not None:
572
# Close the last connection without trying to use it. The
573
# server will not process a single byte on that socket to avoid
574
# complications (SSL starts with a handshake for example).
576
# Check for any exception that could have occurred in the server
579
self._server_thread.join()
580
except Exception as e:
581
if self.server.ignored_exceptions(e):
586
# Make sure we can be called twice safely, note that this means
587
# that we will raise a single exception even if several occurred in
588
# the various threads involved.
591
def set_ignored_exceptions(self, ignored_exceptions):
592
"""Install an exception handler for the server."""
593
self.server.set_ignored_exceptions(self._server_thread,
596
def pending_exception(self):
597
"""Raise uncaught exception in the server."""
598
self.server._pending_exception(self._server_thread)
601
class TestingSmartConnectionHandler(socketserver.BaseRequestHandler,
602
medium.SmartServerSocketStreamMedium):
604
def __init__(self, request, client_address, server):
605
medium.SmartServerSocketStreamMedium.__init__(
606
self, request, server.backing_transport,
607
server.root_client_path,
608
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
609
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
610
socketserver.BaseRequestHandler.__init__(self, request, client_address,
615
while not self.finished:
616
server_protocol = self._build_protocol()
617
self._serve_one_request(server_protocol)
618
except errors.ConnectionTimeout:
619
# idle connections aren't considered a failure of the server
623
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
626
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
628
def __init__(self, server_address, request_handler_class,
629
backing_transport, root_client_path):
630
TestingThreadingTCPServer.__init__(self, server_address,
631
request_handler_class)
632
server.SmartTCPServer.__init__(
633
self, backing_transport,
634
root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
637
self.run_server_started_hooks()
639
TestingThreadingTCPServer.serve(self)
641
self.run_server_stopped_hooks()
644
"""Return the url of the server"""
645
return "bzr://%s:%d/" % self.server_address
648
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
649
"""Server suitable for use by transport tests.
229
651
This server is backed by the process's cwd.
232
654
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
655
self.client_path_extra = None
235
656
self.thread_name_suffix = thread_name_suffix
657
self.host = '127.0.0.1'
659
super(SmartTCPServer_for_testing, self).__init__(
660
(self.host, self.port),
662
TestingSmartConnectionHandler)
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
664
def create_server(self):
665
return self.server_class((self.host, self.port),
666
self.request_handler_class,
667
self.backing_transport,
668
self.root_client_path)
241
670
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
671
client_path_extra='/extra/'):
243
672
"""Set up server for testing.
245
674
:param backing_transport_server: backing server to use. If not