213
237
def start_server(self, backing_server=None):
214
238
"""Setup the Chroot on backing_server."""
215
239
if backing_server is not None:
216
self.backing_transport = transport.get_transport(
240
self.backing_transport = transport.get_transport_from_url(
217
241
backing_server.get_url())
219
self.backing_transport = transport.get_transport('.')
243
self.backing_transport = transport.get_transport_from_path('.')
220
244
super(TestingChrootServer, self).start_server()
222
246
def get_bogus_url(self):
223
247
raise NotImplementedError
226
class SmartTCPServer_for_testing(server.SmartTCPServer):
250
class TestThread(cethread.CatchingExceptionThread):
252
def join(self, timeout=5):
253
"""Overrides to use a default timeout.
255
The default timeout is set to 5 and should expire only when a thread
256
serving a client connection is hung.
258
super(TestThread, self).join(timeout)
259
if timeout and self.isAlive():
260
# The timeout expired without joining the thread, the thread is
261
# therefore stucked and that's a failure as far as the test is
262
# concerned. We used to hang here.
264
# FIXME: we need to kill the thread, but as far as the test is
265
# concerned, raising an assertion is too strong. On most of the
266
# platforms, this doesn't occur, so just mentioning the problem is
267
# enough for now -- vila 2010824
268
sys.stderr.write('thread %s hung\n' % (self.name,))
269
# raise AssertionError('thread %s hung' % (self.name,))
272
class TestingTCPServerMixin(object):
273
"""Mixin to support running socketserver.TCPServer in a thread.
275
Tests are connecting from the main thread, the server has to be run in a
280
self.started = threading.Event()
282
self.stopped = threading.Event()
283
# We collect the resources used by the clients so we can release them
286
self.ignored_exceptions = None
288
def server_bind(self):
289
self.socket.bind(self.server_address)
290
self.server_address = self.socket.getsockname()
294
# We are listening and ready to accept connections
298
# Really a connection but the python framework is generic and
300
self.handle_request()
301
# Let's close the listening socket
306
def handle_request(self):
307
"""Handle one request.
309
The python version swallows some socket exceptions and we don't use
310
timeout, so we override it to better control the server behavior.
312
request, client_address = self.get_request()
313
if self.verify_request(request, client_address):
315
self.process_request(request, client_address)
316
except BaseException:
317
self.handle_error(request, client_address)
319
self.close_request(request)
321
def get_request(self):
322
return self.socket.accept()
324
def verify_request(self, request, client_address):
325
"""Verify the request.
327
Return True if we should proceed with this request, False if we should
328
not even touch a single byte in the socket ! This is useful when we
329
stop the server with a dummy last connection.
333
def handle_error(self, request, client_address):
334
# Stop serving and re-raise the last exception seen
336
# The following can be used for debugging purposes, it will display the
337
# exception and the traceback just when it occurs instead of waiting
338
# for the thread to be joined.
339
# socketserver.BaseServer.handle_error(self, request, client_address)
341
# We call close_request manually, because we are going to raise an
342
# exception. The socketserver implementation calls:
345
# But because we raise the exception, close_request will never be
346
# triggered. This helps client not block waiting for a response when
347
# the server gets an exception.
348
self.close_request(request)
351
def ignored_exceptions_during_shutdown(self, e):
352
if sys.platform == 'win32':
353
accepted_errnos = [errno.EBADF,
361
accepted_errnos = [errno.EBADF,
366
if isinstance(e, socket.error) and e.errno in accepted_errnos:
370
# The following methods are called by the main thread
372
def stop_client_connections(self):
374
c = self.clients.pop()
375
self.shutdown_client(c)
377
def shutdown_socket(self, sock):
378
"""Properly shutdown a socket.
380
This should be called only when no other thread is trying to use the
384
sock.shutdown(socket.SHUT_RDWR)
386
except Exception as e:
387
if self.ignored_exceptions(e):
392
# The following methods are called by the main thread
394
def set_ignored_exceptions(self, thread, ignored_exceptions):
395
self.ignored_exceptions = ignored_exceptions
396
thread.set_ignored_exceptions(self.ignored_exceptions)
398
def _pending_exception(self, thread):
399
"""Raise server uncaught exception.
401
Daughter classes can override this if they use daughter threads.
403
thread.pending_exception()
406
class TestingTCPServer(TestingTCPServerMixin, socketserver.TCPServer):
408
def __init__(self, server_address, request_handler_class):
409
TestingTCPServerMixin.__init__(self)
410
socketserver.TCPServer.__init__(self, server_address,
411
request_handler_class)
413
def get_request(self):
414
"""Get the request and client address from the socket."""
415
sock, addr = TestingTCPServerMixin.get_request(self)
416
self.clients.append((sock, addr))
419
# The following methods are called by the main thread
421
def shutdown_client(self, client):
423
self.shutdown_socket(sock)
426
class TestingThreadingTCPServer(TestingTCPServerMixin,
427
socketserver.ThreadingTCPServer):
429
def __init__(self, server_address, request_handler_class):
430
TestingTCPServerMixin.__init__(self)
431
socketserver.ThreadingTCPServer.__init__(self, server_address,
432
request_handler_class)
434
def get_request(self):
435
"""Get the request and client address from the socket."""
436
sock, addr = TestingTCPServerMixin.get_request(self)
437
# The thread is not created yet, it will be updated in process_request
438
self.clients.append((sock, addr, None))
441
def process_request_thread(self, started, detached, stopped,
442
request, client_address):
444
# We will be on our own once the server tells us we're detached
446
socketserver.ThreadingTCPServer.process_request_thread(
447
self, request, client_address)
448
self.close_request(request)
451
def process_request(self, request, client_address):
452
"""Start a new thread to process the request."""
453
started = threading.Event()
454
detached = threading.Event()
455
stopped = threading.Event()
458
name='%s -> %s' % (client_address, self.server_address),
459
target=self.process_request_thread,
460
args=(started, detached, stopped, request, client_address))
461
# Update the client description
463
self.clients.append((request, client_address, t))
464
# Propagate the exception handler since we must use the same one as
465
# TestingTCPServer for connections running in their own threads.
466
t.set_ignored_exceptions(self.ignored_exceptions)
469
# If an exception occured during the thread start, it will get raised.
470
t.pending_exception()
472
sys.stderr.write('Client thread %s started\n' % (t.name,))
473
# Tell the thread, it's now on its own for exception handling.
476
# The following methods are called by the main thread
478
def shutdown_client(self, client):
479
sock, addr, connection_thread = client
480
self.shutdown_socket(sock)
481
if connection_thread is not None:
482
# The thread has been created only if the request is processed but
483
# after the connection is inited. This could happen during server
484
# shutdown. If an exception occurred in the thread it will be
487
sys.stderr.write('Client thread %s will be joined\n'
488
% (connection_thread.name,))
489
connection_thread.join()
491
def set_ignored_exceptions(self, thread, ignored_exceptions):
492
TestingTCPServerMixin.set_ignored_exceptions(self, thread,
494
for sock, addr, connection_thread in self.clients:
495
if connection_thread is not None:
496
connection_thread.set_ignored_exceptions(
497
self.ignored_exceptions)
499
def _pending_exception(self, thread):
500
for sock, addr, connection_thread in self.clients:
501
if connection_thread is not None:
502
connection_thread.pending_exception()
503
TestingTCPServerMixin._pending_exception(self, thread)
506
class TestingTCPServerInAThread(transport.Server):
507
"""A server in a thread that re-raise thread exceptions."""
509
def __init__(self, server_address, server_class, request_handler_class):
510
self.server_class = server_class
511
self.request_handler_class = request_handler_class
512
self.host, self.port = server_address
514
self._server_thread = None
517
return "%s(%s:%s)" % (self.__class__.__name__, self.host, self.port)
519
def create_server(self):
520
return self.server_class((self.host, self.port),
521
self.request_handler_class)
523
def start_server(self):
524
self.server = self.create_server()
525
self._server_thread = TestThread(
526
sync_event=self.server.started,
527
target=self.run_server)
528
self._server_thread.start()
529
# Wait for the server thread to start (i.e. release the lock)
530
self.server.started.wait()
531
# Get the real address, especially the port
532
self.host, self.port = self.server.server_address
533
self._server_thread.name = self.server.server_address
535
sys.stderr.write('Server thread %s started\n'
536
% (self._server_thread.name,))
537
# If an exception occured during the server start, it will get raised,
538
# otherwise, the server is blocked on its accept() call.
539
self._server_thread.pending_exception()
540
# From now on, we'll use a different event to ensure the server can set
542
self._server_thread.set_sync_event(self.server.stopped)
544
def run_server(self):
547
def stop_server(self):
548
if self.server is None:
551
# The server has been started successfully, shut it down now. As
552
# soon as we stop serving, no more connection are accepted except
553
# one to get out of the blocking listen.
554
self.set_ignored_exceptions(
555
self.server.ignored_exceptions_during_shutdown)
556
self.server.serving = False
558
sys.stderr.write('Server thread %s will be joined\n'
559
% (self._server_thread.name,))
560
# The server is listening for a last connection, let's give it:
563
last_conn = osutils.connect_socket((self.host, self.port))
565
# But ignore connection errors as the point is to unblock the
566
# server thread, it may happen that it's not blocked or even
569
# We start shutting down the clients while the server itself is
571
self.server.stop_client_connections()
572
# Now we wait for the thread running self.server.serve() to finish
573
self.server.stopped.wait()
574
if last_conn is not None:
575
# Close the last connection without trying to use it. The
576
# server will not process a single byte on that socket to avoid
577
# complications (SSL starts with a handshake for example).
579
# Check for any exception that could have occurred in the server
582
self._server_thread.join()
583
except Exception as e:
584
if self.server.ignored_exceptions(e):
589
# Make sure we can be called twice safely, note that this means
590
# that we will raise a single exception even if several occurred in
591
# the various threads involved.
594
def set_ignored_exceptions(self, ignored_exceptions):
595
"""Install an exception handler for the server."""
596
self.server.set_ignored_exceptions(self._server_thread,
599
def pending_exception(self):
600
"""Raise uncaught exception in the server."""
601
self.server._pending_exception(self._server_thread)
604
class TestingSmartConnectionHandler(socketserver.BaseRequestHandler,
605
medium.SmartServerSocketStreamMedium):
607
def __init__(self, request, client_address, server):
608
medium.SmartServerSocketStreamMedium.__init__(
609
self, request, server.backing_transport,
610
server.root_client_path,
611
timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
612
request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
613
socketserver.BaseRequestHandler.__init__(self, request, client_address,
618
while not self.finished:
619
server_protocol = self._build_protocol()
620
self._serve_one_request(server_protocol)
621
except errors.ConnectionTimeout:
622
# idle connections aren't considered a failure of the server
626
_DEFAULT_TESTING_CLIENT_TIMEOUT = 60.0
629
class TestingSmartServer(TestingThreadingTCPServer, server.SmartTCPServer):
631
def __init__(self, server_address, request_handler_class,
632
backing_transport, root_client_path):
633
TestingThreadingTCPServer.__init__(self, server_address,
634
request_handler_class)
635
server.SmartTCPServer.__init__(
636
self, backing_transport,
637
root_client_path, client_timeout=_DEFAULT_TESTING_CLIENT_TIMEOUT)
640
self.run_server_started_hooks()
642
TestingThreadingTCPServer.serve(self)
644
self.run_server_stopped_hooks()
647
"""Return the url of the server"""
648
return "bzr://%s:%d/" % self.server_address
651
class SmartTCPServer_for_testing(TestingTCPServerInAThread):
227
652
"""Server suitable for use by transport tests.
229
654
This server is backed by the process's cwd.
232
657
def __init__(self, thread_name_suffix=''):
233
super(SmartTCPServer_for_testing, self).__init__(None)
234
658
self.client_path_extra = None
235
659
self.thread_name_suffix = thread_name_suffix
660
self.host = '127.0.0.1'
662
super(SmartTCPServer_for_testing, self).__init__(
663
(self.host, self.port),
665
TestingSmartConnectionHandler)
237
def get_backing_transport(self, backing_transport_server):
238
"""Get a backing transport from a server we are decorating."""
239
return transport.get_transport(backing_transport_server.get_url())
667
def create_server(self):
668
return self.server_class((self.host, self.port),
669
self.request_handler_class,
670
self.backing_transport,
671
self.root_client_path)
241
673
def start_server(self, backing_transport_server=None,
242
client_path_extra='/extra/'):
674
client_path_extra='/extra/'):
243
675
"""Set up server for testing.
245
677
:param backing_transport_server: backing server to use. If not