Convert remote building state management to work in with statements

exception and state handling is a lot easier now, less class vars etc.
This commit is contained in:
Anders Ingemann 2015-01-25 13:31:34 +01:00
parent 5b48ce58c5
commit 17a4511ee1
3 changed files with 85 additions and 83 deletions

View file

@ -14,19 +14,19 @@ class CallbackServer(object):
self.daemon.register(self) self.daemon.register(self)
self.abort = False self.abort = False
def start(self): def __enter__(self):
def serve(): def serve():
self.daemon.requestLoop() self.daemon.requestLoop()
from threading import Thread from threading import Thread
self.thread = Thread(target=serve) self.thread = Thread(target=serve)
log.debug('Starting the callback server') log.debug('Starting callback server')
self.thread.start() self.thread.start()
return self
def stop(self): def __exit__(self, type, value, traceback):
if hasattr(self, 'daemon'): log.debug('Shutting down callback server')
self.daemon.shutdown() self.daemon.shutdown()
if hasattr(self, 'thread'): self.thread.join()
self.thread.join()
@Pyro4.expose @Pyro4.expose
def handle_log(self, pickled_record): def handle_log(self, pickled_record):

View file

@ -1,5 +1,6 @@
from build_server import BuildServer from build_server import BuildServer
from bootstrapvz.common.tools import log_check_call from bootstrapvz.common.tools import log_check_call
from contextlib import contextmanager
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -15,18 +16,27 @@ class RemoteBuildServer(BuildServer):
self.keyfile = settings['keyfile'] self.keyfile = settings['keyfile']
self.server_bin = settings['server_bin'] self.server_bin = settings['server_bin']
@contextmanager
def connect(self):
with self.spawn_server() as forwards:
with connect_pyro('localhost', forwards['local_server_port']) as connection:
from callback import CallbackServer
args = {'listen_port': forwards['local_callback_port'],
'remote_port': forwards['remote_callback_port']}
with CallbackServer(**args) as callback_server:
connection.set_callback_server(callback_server)
yield (connection, callback_server)
@contextmanager
def spawn_server(self):
from . import getNPorts from . import getNPorts
# We can't use :0 for the forwarding ports because # We can't use :0 for the forwarding ports because
# A: It's quite hard to retrieve the port on the remote after the daemon has started # A: It's quite hard to retrieve the port on the remote after the daemon has started
# B: SSH doesn't accept 0:localhost:0 as a port forwarding option # B: SSH doesn't accept 0:localhost:0 as a port forwarding option
[self.local_server_port, self.local_callback_port] = getNPorts(2) [local_server_port, local_callback_port] = getNPorts(2)
[self.remote_server_port, self.remote_callback_port] = getNPorts(2) [remote_server_port, remote_callback_port] = getNPorts(2)
def connect(self): server_cmd = ['sudo', self.server_bin, '--listen', str(remote_server_port)]
log.debug('Opening SSH connection to build server `{name}\''.format(name=self.name))
import subprocess
server_cmd = ['sudo', self.server_bin, '--listen', str(self.remote_server_port)]
def set_process_group(): def set_process_group():
# Changes the process group of a command so that any SIGINT # Changes the process group of a command so that any SIGINT
@ -38,46 +48,28 @@ class RemoteBuildServer(BuildServer):
addr_arg = '{user}@{host}'.format(user=self.username, host=self.address) addr_arg = '{user}@{host}'.format(user=self.username, host=self.address)
ssh_cmd = ['ssh', '-i', self.keyfile, ssh_cmd = ['ssh', '-i', self.keyfile,
'-p', str(self.port), '-p', str(self.port),
'-L' + str(self.local_server_port) + ':localhost:' + str(self.remote_server_port), '-L' + str(local_server_port) + ':localhost:' + str(remote_server_port),
'-R' + str(self.remote_callback_port) + ':localhost:' + str(self.local_callback_port), '-R' + str(remote_callback_port) + ':localhost:' + str(local_callback_port),
addr_arg] addr_arg]
full_cmd = ssh_cmd + ['--'] + server_cmd full_cmd = ssh_cmd + ['--'] + server_cmd
log.debug('Opening SSH connection to build server `{name}\''.format(name=self.name))
import sys import sys
self.ssh_process = subprocess.Popen(args=full_cmd, stdout=sys.stderr, stderr=sys.stderr, import subprocess
preexec_fn=set_process_group) ssh_process = subprocess.Popen(args=full_cmd, stdout=sys.stderr, stderr=sys.stderr,
preexec_fn=set_process_group)
# Check that we can connect to the server
try: try:
import Pyro4 yield {'local_server_port': local_server_port,
server_uri = 'PYRO:server@localhost:{server_port}'.format(server_port=self.local_server_port) 'local_callback_port': local_callback_port,
self.connection = Pyro4.Proxy(server_uri) 'remote_server_port': remote_server_port,
'remote_callback_port': remote_callback_port}
log.debug('Connecting to the RPC daemon on build server `{name}\''.format(name=self.name))
remaining_retries = 5
while True:
try:
self.connection.ping()
break
except (Pyro4.errors.ConnectionClosedError, Pyro4.errors.CommunicationError):
if remaining_retries > 0:
remaining_retries -= 1
from time import sleep
sleep(2)
else:
raise
except (Exception, KeyboardInterrupt): except (Exception, KeyboardInterrupt):
self.ssh_process.terminate() log.debug('Forcefully terminating SSH connection to the build server')
ssh_process.terminate()
raise raise
return self.connection else:
log.debug('Waiting for SSH connection to the build server to close')
def disconnect(self): ssh_process.wait()
if hasattr(self, 'connection'):
log.debug('Stopping the RPC daemon on build server `{name}\''.format(name=self.name))
self.connection.stop()
self.connection._pyroRelease()
if hasattr(self, 'ssh_process'):
log.debug('Waiting for SSH connection to build server `{name}\' to terminate'.format(name=self.name))
self.ssh_process.wait()
def download(self, src, dst): def download(self, src, dst):
log.debug('Downloading file `{src}\' from ' log.debug('Downloading file `{src}\' from '
@ -103,3 +95,31 @@ class RemoteBuildServer(BuildServer):
def run(self, manifest): def run(self, manifest):
from bootstrapvz.remote.main import run from bootstrapvz.remote.main import run
return run(manifest, self) return run(manifest, self)
@contextmanager
def connect_pyro(host, port):
import Pyro4
server_uri = 'PYRO:server@{host}:{port}'.format(host=host, port=port)
connection = Pyro4.Proxy(server_uri)
log.debug('Connecting to the RPC daemon')
remaining_retries = 5
while True:
try:
connection.ping()
break
except (Pyro4.errors.ConnectionClosedError, Pyro4.errors.CommunicationError):
if remaining_retries > 0:
remaining_retries -= 1
from time import sleep
sleep(2)
else:
raise
try:
yield connection
finally:
log.debug('Stopping the RPC daemon')
connection.stop()
connection._pyroRelease()

View file

@ -75,40 +75,22 @@ def run(manifest, build_server, debug=False, dry_run=False):
on the other side and initiates a remote bootstrapping procedure on the other side and initiates a remote bootstrapping procedure
""" """
bootstrap_info = None bootstrap_info = None
try: with build_server.connect() as (connection, callback_server):
# Connect to the build server # Replace the standard SIGINT handler with a remote call to the server
connection = build_server.connect() # so that it may abort the run.
# Start a callback server on this side, so that we may receive log entries def abort(signum, frame):
from callback import CallbackServer import logging
callback_server = CallbackServer(listen_port=build_server.local_callback_port, logging.getLogger(__name__).warn('SIGINT received, asking remote to abort.')
remote_port=build_server.remote_callback_port) callback_server.abort_run()
try: import signal
# Start the callback server (in a background thread) orig_sigint = signal.signal(signal.SIGINT, abort)
callback_server.start()
# Tell the RPC daemon about the callback server
connection.set_callback_server(callback_server)
# Replace the standard SIGINT handler with a remote call to the server # Everything has been set up, begin the bootstrapping process
# so that it may abort the run. bootstrap_info = connection.run(manifest,
def abort(signum, frame): debug=debug,
import logging # We can't pause the bootstrapping process remotely, yet...
logging.getLogger(__name__).warn('SIGINT received, asking remote to abort.') pause_on_error=False,
callback_server.abort_run() dry_run=dry_run)
import signal # Restore the old SIGINT handler
orig_sigint = signal.signal(signal.SIGINT, abort) signal.signal(signal.SIGINT, orig_sigint)
# Everything has been set up, begin the bootstrapping process
bootstrap_info = connection.run(manifest,
debug=debug,
# We can't pause the bootstrapping process remotely, yet...
pause_on_error=False,
dry_run=dry_run)
# Restore the old SIGINT handler
signal.signal(signal.SIGINT, orig_sigint)
finally:
# Stop the callback server
callback_server.stop()
finally:
# Stop the RPC daemon and close the SSH connection
build_server.disconnect()
return bootstrap_info return bootstrap_info