diff --git a/bin/cbuildbot.py b/bin/cbuildbot.py index 6bc398a665..561f01bc30 100755 --- a/bin/cbuildbot.py +++ b/bin/cbuildbot.py @@ -7,13 +7,13 @@ """CBuildbot is wrapper around the build process used by the pre-flight queue""" import errno +import re import optparse import os -import re -import shutil import subprocess import sys +import cbuildbot_comm from cbuildbot_config import config _DEFAULT_RETRIES = 3 @@ -376,12 +376,27 @@ def main(): _UprevPackages(buildroot, revisionfile, board=buildconfig['board']) _Build(buildroot) if buildconfig['uprev']: - _UprevPush(buildroot) - _UprevCleanup(buildroot) + if buildconfig['master']: + # Master bot needs to check if the other slaves completed. + if cbuildbot_comm.HaveSlavesCompleted(config): + _UprevPush(buildroot) + _UprevCleanup(buildroot) + else: + # At least one of the slaves failed or we timed out. + _UprevCleanup(buildroot) + sys.stderr('CBUILDBOT - One of the slaves has failed!!!') + sys.exit(1) + else: + # Publish my status to the master. + cbuildbot_comm.PublishStatus(cbuildbot_comm.STATUS_BUILD_COMPLETE) + _UprevCleanup(buildroot) except: - # something went wrong, cleanup (being paranoid) for next build + # Something went wrong, cleanup (being paranoid) for next build. if clobber: RunCommand(['sudo', 'rm', '-rf', buildroot], print_cmd=False) + # Send failure to master bot. + if not buildconfig['master']: + cbuildbot_comm.PublishStatus(cbuildbot_comm.STATUS_BUILD_FAILED) raise diff --git a/bin/cbuildbot_comm.py b/bin/cbuildbot_comm.py new file mode 100755 index 0000000000..7dfc266c4c --- /dev/null +++ b/bin/cbuildbot_comm.py @@ -0,0 +1,195 @@ +# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Module contains communication methods between cbuildbot instances.""" + +import Queue +import SocketServer +import socket +import sys +import time + +from cbuildbot import RunCommand + +# Communication port for master to slave communication. +_COMM_PORT = 32890 +# TCP Buffer Size. +_BUFFER = 4096 +# Timeout between checks for new status by either end. +_HEARTBEAT_TIMEOUT = 60 # in sec. +# Max Timeout to wait before assuming failure. +_MAX_TIMEOUT = 30 * 60 # in sec. + +# Commands - sent to slave from master. + +# Report whether you have completed or failed building. +_COMMAND_CHECK_STATUS = 'check-status' + +# Return status - response to commands from slaves (self.explanatory) +_STATUS_COMMAND_REJECTED = 'rejected' +_STATUS_TIMEOUT = 'timeout' +# Public for cbuildbot. +STATUS_BUILD_COMPLETE = 'complete' +STATUS_BUILD_FAILED = 'failure' + +# Global queues to communicate with server. +_status_queue = Queue.Queue(1) +_receive_queue = Queue.Queue(1) +_command_queue = Queue.Queue(1) + +class _TCPServerWithReuse(SocketServer.TCPServer): + """TCPServer that allows re-use of socket and timed out sockets.""" + SocketServer.TCPServer.allow_reuse_address = True + + def __init__(self, address, handler, timeout): + SocketServer.TCPServer.__init__(self, address, handler) + self.socket.settimeout(timeout) + + +class _SlaveCommandHandler(SocketServer.BaseRequestHandler): + """Handles requests from a master pre-flight-queue bot.""" + + def _HandleCommand(self, command, args): + """Handles command and returns status for master.""" + print >> sys.stderr, ('(Slave) - Received command %s with args %s' % + (command, args)) + command_to_expect = _command_queue.get() + # Check status also adds an entry on the status queue. + if command_to_expect == _COMMAND_CHECK_STATUS: + slave_status = _status_queue.get() + # Safety check to make sure the server is in a good state. + if command_to_expect != command: + print >> sys.stderr, ( + '(Slave) - Rejecting command %s. Was expecting %s.' % (command, + command_to_expect)) + return _STATUS_COMMAND_REJECTED + # Give slave command with optional args. + _receive_queue.put(args) + if command == _COMMAND_CHECK_STATUS: + # Returns status to send. + return slave_status + + def handle(self): + """Overriden. Handles commands sent from master.""" + data = self.request.recv(_BUFFER).strip() + (command, args) = data.split('\n') + response = self._HandleCommand(command, args) + self.request.send(response) + + +def _GetSlaveNames(configuration): + """Returns an array of slave hostnames that are important.""" + slaves = [] + for slave_config in configuration.items(): + if (not slave_config[1]['master'] and + slave_config[1]['important']): + slaves.append(slave_config[1]['hostname']) + return slaves + + +def _SendCommand(hostname, command, args): + """Returns response from host or _STATUS_TIMEOUT on error.""" + data = '%s\n%s\n' % (command, args) + print '(Master) - Sending %s %s to %s' % (command, args, hostname) + + # Create a socket (SOCK_STREAM means a TCP socket). + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + # Connect to server and send data + sock.connect((hostname, _COMM_PORT)) + sock.send(data) + + # Receive data from the server and shut down. + received = sock.recv(_BUFFER) + except: + received = _STATUS_TIMEOUT + finally: + sock.close() + return received + + +def _CheckSlavesLeftStatus(slaves_to_check): + """Returns True if remaining slaves have completed. + + Once a slave reports STATUS_BUILD_COMPLETE, removes slave from list. Returns + True as long as no slave reports STATUS_BUILD_FAILED. + + Keyword arguments: + slaves_to_check -- Array of hostnames to check. + + """ + slaves_to_remove = [] + for slave in slaves_to_check: + status = _SendCommand(slave, _COMMAND_CHECK_STATUS, 'empty') + if status == STATUS_BUILD_FAILED: + print >> sys.stderr, '(Master) - Slave %s failed' % slave + return False + elif status == STATUS_BUILD_COMPLETE: + print >> sys.stderr, '(Master) - Slave %s completed' % slave + slaves_to_remove.append(slave) + for slave in slaves_to_remove: + slaves_to_check.remove(slave) + return True + + +def HaveSlavesCompleted(configuration): + """Returns True if all other slaves have succeeded. + + Checks other slaves status until either '_MAX_TIMEOUT' has passed, + at least one slaves reports a failure, or all slaves report success. + + Keyword arguments: + configuration -- configuration dictionary for slaves. + + """ + not_failed = True + slaves_to_check = _GetSlaveNames(configuration) + timeout = 0 + while slaves_to_check and not_failed and timeout < _MAX_TIMEOUT: + not_failed = _CheckSlavesLeftStatus(slaves_to_check) + if slaves_to_check and not_failed: + time.sleep(_HEARTBEAT_TIMEOUT) + timeout += _HEARTBEAT_TIMEOUT + return len(slaves_to_check) == 0 + + +def PublishStatus(status): + """Publishes status and Returns True if master received it. + + This call is blocking until either the master pre-flight-queue bot picks + up the status, or a '_MAX_TIMEOUT' has passed. + + Keyword arguments: + status -- should be a string and one of STATUS_BUILD_.*. + + """ + # Clean up queues. + try: + _command_queue.get_nowait() + except Queue.Empty: pass + try: + _status_queue.get_nowait() + except Queue.Empty: pass + + _command_queue.put(_COMMAND_CHECK_STATUS) + _status_queue.put(status) + server = _TCPServerWithReuse(('localhost', _COMM_PORT), + _SlaveCommandHandler, _HEARTBEAT_TIMEOUT) + timeout = 0 + response = None + try: + while not response and timeout < _MAX_TIMEOUT: + server.handle_request() + try: + response = _receive_queue.get_nowait() + except Queue.Empty: + print >> sys.stderr, ('(Slave) - Waiting for master to accept %s' % ( + status)) + timeout += _HEARTBEAT_TIMEOUT + response = None + except Exception, e: + print >> sys.stderr, '%s' % e + server.server_close() + return response != None diff --git a/bin/cbuildbot_comm_unittest.py b/bin/cbuildbot_comm_unittest.py new file mode 100755 index 0000000000..4c95fb6ed0 --- /dev/null +++ b/bin/cbuildbot_comm_unittest.py @@ -0,0 +1,101 @@ +#!/usr/bin/python + +# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Units tests for cbuildbot_comm commands.""" + +import cbuildbot_comm +import sys +import threading +import time +import unittest + +_TEST_CONFIG = {'test_slave' : + {'master' : False, + 'hostname' : 'localhost', + 'important' : True}, + 'test_master' : + {'master' : True, + 'important' : False + } + } + +# Reduce timeouts. +cbuildbot_comm._HEARTBEAT_TIMEOUT = 2 +cbuildbot_comm._MAX_TIMEOUT = 6 + +class _MasterSendBadStatus(threading.Thread): + + def __init__(self, test_class): + threading.Thread.__init__(self) + self.test_class = test_class + + def run(self): + # Sleep for heartbeat timeout to let slave start up. + time.sleep(2) + return_value = cbuildbot_comm._SendCommand('localhost', 'bad-command', + 'args') + self.test_class.assertEqual(return_value, + cbuildbot_comm._STATUS_COMMAND_REJECTED) + +class _MasterCheckStatusThread(threading.Thread): + + def __init__(self, config, expected_return, test_class): + threading.Thread.__init__(self) + self.config = config + self.expected_return = expected_return + self.test_class = test_class + + def run(self): + return_value = cbuildbot_comm.HaveSlavesCompleted(self.config) + self.test_class.assertEqual(return_value, self.expected_return) + + +class CBuildBotCommTest(unittest.TestCase): + + def testSlaveComplete(self): + print >> sys.stderr, '\n>>> Running testSlaveComplete\n' + # Master should check statuses in another thread. + master_thread = _MasterCheckStatusThread(_TEST_CONFIG, True, self) + master_thread.start() + + return_value = cbuildbot_comm.PublishStatus( + cbuildbot_comm.STATUS_BUILD_COMPLETE) + self.assertEqual(return_value, True) + + def testMasterTimeout(self): + print >> sys.stderr, '\n>>> Running testMasterTimeout\n' + return_value = cbuildbot_comm.HaveSlavesCompleted(_TEST_CONFIG) + self.assertEqual(return_value, False) + + def testSlaveTimeout(self): + print >> sys.stderr, '\n>>> Running testSlaveTimeout\n' + return_value = cbuildbot_comm.PublishStatus( + cbuildbot_comm.STATUS_BUILD_COMPLETE) + self.assertEqual(return_value, False) + + def testSlaveFail(self): + print >> sys.stderr, '\n>>> Running testSlaveFail\n' + # Master should check statuses in another thread. + master_thread = _MasterCheckStatusThread(_TEST_CONFIG, False, self) + master_thread.start() + + return_value = cbuildbot_comm.PublishStatus( + cbuildbot_comm.STATUS_BUILD_FAILED) + self.assertEqual(return_value, True) + + def testBadCommand(self): + print >> sys.stderr, '\n>>> Running testSendBadCommand\n' + # Master should check statuses in another thread. + master_thread = _MasterSendBadStatus(self) + master_thread.start() + + return_value = cbuildbot_comm.PublishStatus( + cbuildbot_comm.STATUS_BUILD_COMPLETE) + self.assertEqual(return_value, False) + + +if __name__ == '__main__': + unittest.main() diff --git a/bin/cbuildbot_config.py b/bin/cbuildbot_config.py index 2528a73dff..c68c225c8b 100644 --- a/bin/cbuildbot_config.py +++ b/bin/cbuildbot_config.py @@ -1,15 +1,26 @@ -#!/usr/bin/python - # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. +# Dictionary values that aren't self-explanatory: +# 'master' - only one allowed to be True. This bot controls the uprev process. +# 'important' - master bot uses important bots to determine overall status. +# i.e. if master bot succeeds and other important slaves succeed +# then the master will uprev packages. This should align +# with info vs. closer except for the master. +# 'hostname' - Needed for 'important' slaves. The hostname of the bot. Should +# match hostname in slaves.cfg in buildbot checkout. + config = {} config['default'] = { 'board' : 'x86-generic', 'uprev' : False, + 'master' : False, + 'important' : False, } config['x86-generic-pre-flight-queue'] = { 'board' : 'x86-generic', 'uprev' : True, + 'master' : True, + 'important' : False, } diff --git a/bin/cbuildbot_unittest.py b/bin/cbuildbot_unittest.py index d3c0500fc2..203f2a8c76 100755 --- a/bin/cbuildbot_unittest.py +++ b/bin/cbuildbot_unittest.py @@ -10,6 +10,8 @@ import __builtin__ import mox import unittest +# Fixes circular dependency error. +import cbuildbot_comm import cbuildbot class CBuildBotTest(mox.MoxTestBase):