# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import os import os.path import sys import commands import threading import time import rpmUtils import exceptions import shutil import copy import string import EmailUtils import SimpleXMLRPCServer import xmlrpclib import socket import BuilderManager import ArchJob from plague import ArchUtils from plague import DebugUtils CVS_CMD = "/usr/bin/cvs" MAKE_CMD = "/usr/bin/make" DEBUG = False def debugprint(stuff=''): if DEBUG: print stuff def log(stuff=''): print stuff class PrepError(exceptions.Exception): def __init__(self, args=None): exceptions.Exception.__init__(self) self.args = args def __str__(self): return self.args class BuildError(exceptions.Exception): def __init__(self, msg, arch): exceptions.Exception.__init__(self) self.msg = msg self.arch = arch def __str__(self): return self.msg class PackageJobController(threading.Thread): """ A class that controls PackageJob objects through specific phases. Python seems to have issues with threads, so to reduce the total running threadcount, we split the PackageJob runs up into two threads, one for everything up to the "build" stage, and then the actual "build" stage. This class provides the actual running thread. """ def __init__(self, pkg_job, start_stage, end_stage): self._pkg_job = pkg_job if not end_stage: end_stage = 'aaaaa' self._end_stage = end_stage self._pkg_job._set_cur_stage(start_stage) threading.Thread.__init__(self) self.setName("PackageJob: %s/%s" % (pkg_job.uid, pkg_job.package)) def run(self): DebugUtils.registerThreadName(self) while not self._pkg_job.is_done() and not self._pkg_job.get_cur_stage() == self._end_stage: self._pkg_job.process() def is_package_job_stage_valid(stage): """ Validate a job stage """ stages = ['initialize', 'checkout_wait', 'checkout_wait_done', 'checkout', 'make_srpm', 'prep', 'waiting', 'building', 'build_done', 'add_to_repo', 'repodone', 'needsign', 'failed', 'finished'] if stage in stages: return True return False def is_package_job_result_valid(result): """ Validate a job result """ results = ['failed', 'success', 'killed', 'in-progress'] if result in results: return True return False def make_job_log_url(base_url, target_str, uid, name, ver, release): if target_str and uid and name and ver and release: if base_url.endswith('/'): slash='' else: slash='/' return "%s%s%s/%s-%s-%s-%s/" % (base_url, slash, target_str, uid, name, ver, release) return None class PackageJob: """ Controller object for building 1 SRPM on multiple arches """ def __init__(self, uid, username, package, source, repo, buildmaster): self.curstage = '' self.result = 'in-progress' self.bm = buildmaster self.uid = uid self.package = package self.name = None self.epoch = None self.ver = None self.release = None self._target_cfg = repo.target_cfg() self._server_cfg = self._target_cfg.parent_cfg() self.repo = repo self._target_str = self._target_cfg.target_string() self._target_dict = self._target_cfg.target_dict() self.username = username self.starttime = time.time() self.endtime = 0 self.use_cvs = self._server_cfg.get_bool("CVS", "use_cvs") self._source = source self.result_dir = None self.srpm_path = None self.srpm_http_path = None self.repofiles = {} self.archjobs = {} self._archjobs_lock = threading.Lock() self._event = threading.Event() self._killer = None self._die = False self.http_dir = os.path.join(self._server_cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir") first_stage = 'initialize' if self.use_cvs == False: first_stage = 'prep' pjc = PackageJobController(self, first_stage, 'waiting') pjc.start() def get_cur_stage(self): return self.curstage def get_result(self): return self.result def _set_cur_stage(self, stage, result_msg=None): """ Update our internal job stage, and notify the BuildMaster that we've changed as well. """ oldstage = self.curstage self.curstage = stage if oldstage != stage: attrdict = {} attrdict['status'] = copy.copy(stage) attrdict['result'] = copy.copy(self.result) if self.name and self.epoch and self.ver and self.release: attrdict['epoch'] = self.epoch attrdict['version'] = self.ver attrdict['release'] = self.release if result_msg: attrdict['result_msg'] = result_msg self.bm.queue_job_status_update(self.uid, attrdict) def get_uid(self): return self.uid def arch_handling(self, hdr): # Grab additional allowed arches for this package, using # wildcard matching if needed addl_arches = self._target_cfg.addl_arches_for_pkg(self.name) # Grab list of base arches (like i386, ppc, x86_64) that this # buildsys supports. NOT subarch variants like i686, i586, ppc32, etc. base_arches = self._target_cfg.get_list("Arches", "base_arches") # Remove arches the buildsys doesn't support from addl_arches for arch in addl_arches: # ArchUtils.sub_arches is only used to determine which arches to build on by default, # so that if we have an Additional Package Arches that specifies # 'sparcv9' for a package that we don't try to build sparcv9 for that # package unless 'sparc' is also listed in our 'targets' config option. if ArchUtils.sub_arches.has_key(arch): master_addl_arch = ArchUtils.sub_arches[arch] if master_addl_arch not in base_arches: addl_arches.remove(arch) # Grab arches the SRPM does/does not want to build for. If the package # does use one or more of these tags in the specfile, hdr[''] will # return an empty list, and we'll use defaults instead ba = hdr['buildarchs'] exclusive = hdr['exclusivearch'] exclude = hdr['excludearch'] build_arches = {} # If the SRPM is noarch, there's nothing left to do, since # it can build on any architecture the builders support if ba == ['noarch']: build_arches['noarch'] = None return (build_arches, None, None) # default to building all base arches the target # supports, and any additional arches from the # Additional Package Arches file whose "master" arch # is enabled for this target pkg_arches = [] # Arches the package wants to build for allowed_arches = [] # Arches the buildsys allows # Add all base architectures the buildsys supports for arch in base_arches: pkg_arches.append(arch) allowed_arches.append(arch) # Add additional per-package architectures from Additional # Package Architectures configuration option for arch in addl_arches: pkg_arches.append(arch) allowed_arches.append(arch) # Allow building on any optional architectures the buildsystem # supports, but don't build for them unless the package # requests it opt_arches = self._target_cfg.get_list("Arches", "optional_arches") for arch in opt_arches: allowed_arches.append(arch) if ba: # If the package specifies a set of arches with # BuildArch, use those rather than the default set pkg_arches = ba elif exclusive: # If the package specifies a set of arches with # ExclusiveArch, use those rather than the default set pkg_arches = exclusive # Filter out any arches the package explicitly excludes if exclude: for arch in exclude: if arch in pkg_arches: pkg_arches.remove(arch) # Filter the final list of arches the package will build on # through the list of arches this buildsys allows for thisarch in pkg_arches: if thisarch in allowed_arches: build_arches[thisarch] = None return (build_arches, pkg_arches, allowed_arches) def _stage_initialize(self): self._set_cur_stage('checkout_wait') self.bm.queue_checkout_wait(self) return True def _stage_checkout_wait(self): return True def checkout_wait_done_callback(self): self._set_cur_stage('checkout_wait_done') self.wake() def _stage_checkout_wait_done(self): self._set_cur_stage('checkout') return False def _stage_checkout(self): err_msg = None # Create the temporary checkout directory dirname = "%s-%s-%d" % (self.uid, self._source, time.time()) tmpdir = self._server_cfg.get_str("Directories", "tmpdir") self.checkout_tmpdir = os.path.join(tmpdir, dirname) if os.path.exists(self.checkout_tmpdir): shutil.rmtree(self.checkout_tmpdir, ignore_errors=True) os.makedirs(self.checkout_tmpdir) # Set up CVS environment env_args = "CVSROOT='%s'" % self._target_cfg.get_str("CVS", "cvs_root") cvs_rsh = self._target_cfg.get_str("CVS", "cvs_rsh") if len(cvs_rsh) > 0: env_args = "%s CVS_RSH='%s'" % (env_args, cvs_rsh) # Checkout the module cmd = 'cd %s; %s %s co -r %s %s' % (self.checkout_tmpdir, env_args, CVS_CMD, self._source, self.package) debugprint("%d: Running %s" % (self.uid, cmd)) s, o = commands.getstatusoutput(cmd) if s != 0: err_msg = "Error: could not check out %s from %s - output was:\n\n" \ "%s" % (self._source, self._target_str, o) else: # Just in case the 'common' directory didn't come along for the ride, # get it from CVS pkg_path = os.path.join(self.checkout_tmpdir, self.package) if not os.path.exists(os.path.join(pkg_path, "common")): cmd = 'cd %s; %s %s co common' % (pkg_path, env_args, CVS_CMD) debugprint("%d: Running %s" % (self.uid, cmd)) s, o = commands.getstatusoutput(cmd) if s != 0: err_msg = "Error: could not check out common directory - " \ "output was:\n\n%s" % (self._source, self._target_str, o) self.bm.notify_checkout_done(self) if err_msg: raise PrepError(err_msg) self._set_cur_stage('make_srpm') return False def _stage_make_srpm(self): # Map our target to the CVS target alias, since CVS may have # different target names than we expose cvs_target = self._target_dict['target'] cvs_alias = self._target_cfg.get_str("Aliases", "cvs_alias") if len(cvs_alias) > 0: cvs_target = cvs_alias self.srpm_path = None srpm_dir = os.path.join(self.checkout_tmpdir, self.package, cvs_target) if not os.path.exists(srpm_dir): msg = "Error: could not find checkout directory %s for %s. Sources probably don't exist for this target." % (srpm_dir, self._source) raise PrepError(msg) cmd = 'cd %s; %s srpm' % (srpm_dir, MAKE_CMD) debugprint("%d: Running %s in %s" % (self.uid, cmd, srpm_dir)) s, o = commands.getstatusoutput(cmd) if s != 0: # Don't include download progress lines in output lines = o.split('\n') output_lines = [] for line in lines: if line.find('..........') == -1 and len(line) > 0: output_lines.append(line) o = string.join(output_lines, '\n') msg = "Error: could not make srpm for %s - output was:\n\n%s" % (self._source, o) raise PrepError(msg) srpmpath = None for line in o.split("\n"): if line.startswith("Wrote:"): line.replace("\n", "") (garbage, path) = line.split(':') srpmpath = path.strip() break if not srpmpath: msg = "Error: could not find srpm for %s - output was:\n\n%s" % (self._source, o) raise PrepError(msg) self.srpm_path = srpmpath self._set_cur_stage('prep') return False def _make_stage_dir(self, rootdir): # The dir will look like this: # /devel/95-foo-1.1.0-23 pkgsubdir = '%d-%s-%s-%s' % (self.uid, self.name, self.ver, self.release) stage_dir = os.path.join(rootdir, self._target_str, pkgsubdir) if os.path.exists(stage_dir): shutil.rmtree(stage_dir, ignore_errors=True) os.makedirs(stage_dir) return stage_dir def _stage_prep(self): # In SRPM-only mode, cvs_tag is path to the SRPM to build if self.use_cvs == False: self.srpm_path = self._source # fail the job if we can't access the SRPM. Can happen during # requeue of jobs when restarting the server. if not os.path.exists(self.srpm_path) or not os.access(self.srpm_path, os.R_OK): msg = "Could not access SRPM located at %s during prep stage." % self.srpm_path raise PrepError(msg) ts = rpmUtils.transaction.initReadOnlyTransaction() hdr = rpmUtils.miscutils.hdrFromPackage(ts, self.srpm_path) self.name = hdr['name'] self.epoch = hdr['epoch'] if not self.epoch: self.epoch = '0' self.ver = hdr['version'] self.release = hdr['release'] (self.archjobs, pkg_arches, allowed_arches) = self.arch_handling(hdr) del hdr del ts if len(self.archjobs) == 0: msg = """Package %s does not build on any architectures this build system supports. Package: %s Build System: %s """ % (self._source, pkg_arches, allowed_arches) raise PrepError(msg) work_dir = self._server_cfg.get_str("Directories", "server_work_dir") self.result_dir = self._make_stage_dir(work_dir) for arch in self.archjobs.keys(): thisdir = os.path.join(self.result_dir, arch) if not os.path.exists(thisdir): os.makedirs(thisdir) # Copy SRPM to where the builder can access it http_pkg_path = self._make_stage_dir(self.http_dir) self.srpm_http_path = os.path.join(http_pkg_path, os.path.basename(self.srpm_path)) shutil.copy(self.srpm_path, self.srpm_http_path) self.srpm_path = None # Remove CVS checkout and make_srpm dirs if self.use_cvs == True: shutil.rmtree(self.checkout_tmpdir, ignore_errors=True) self._request_arch_jobs() self._set_cur_stage('waiting') return False def _request_one_arch_job(self, arch, orphaned): # Construct SPRM URL srpm_http_base = self.srpm_http_path[len(self.http_dir):] use_ssl = self._server_cfg.get_bool("Builders", "use_ssl") if use_ssl == True: method = "https://" else: method = "http://" hostname = self._server_cfg.get_str("General", "hostname") srpm_url = method + hostname + ":8886/" + srpm_http_base target_dict = {} target_dict['distro'] = self._target_cfg.get_str("General", "distro") target_dict['target'] = self._target_cfg.get_str("General", "target") target_dict['arch'] = arch target_dict['repo'] = self._target_cfg.get_str("General", "repo") self.bm.builder_manager.request_arch_job(self, target_dict, srpm_url, orphaned) def _request_arch_jobs(self): # Queue requests for build jobs self._archjobs_lock.acquire() for arch in self.archjobs.keys(): if self.archjobs[arch]: continue self._request_one_arch_job(arch, False) self._archjobs_lock.release() def add_arch_job(self, job): """ Called by the BuilderManager when it's started a new arch job for us """ self._archjobs_lock.acquire() jobarch = job.arch() if self.archjobs[jobarch] != None: log("%s (%s/%s): Already have archjob for this arch (%s). New job UID is %s." % (self.uid, \ self.package, jobarch, self.archjobs[jobarch].jobid, job.jobid)) self.archjobs[jobarch] = job # If this is the first archjob, that means we are now building. # So we start up the second PackageJobController thread. if self.curstage == 'waiting': t = PackageJobController(self, 'building', None) t.start() self._archjobs_lock.release() log("%s (%s/%s): %s - UID is %s" % (self.uid, self.package, jobarch, job.builder.address(), job.jobid)) def remove_arch_job(self, job): """ Removes an arch job when its builder is no longer responding """ self._archjobs_lock.acquire() jobarch = job.arch() log("%s (%s/%s): Builder disappeared. Requeuing arch..." % (self.uid, self.package, jobarch)) self.archjobs[jobarch] = None self._request_one_arch_job(jobarch, True) self._archjobs_lock.release() def is_done(self): if self.curstage == 'needsign' or self.curstage == 'failed' or self.curstage == 'finished': return True return False def die(self, username): self._killer = username self._die = True log("%s (%s): Job kill request from %s" % (self.uid, self.package, username)) self._archjobs_lock.acquire() if self.curstage == 'waiting': # In 'waiting' stage, we have no controller thread. So to get # the job killed immediately, we have to start one t = PackageJobController(self, 'killed', None) t.start() else: # Otherwise, wake up the existing controller thread self.wake() self._archjobs_lock.release() def _handle_death(self): resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self._target_str, self._killer) self.result = 'killed' self._set_cur_stage('finished', resultstring) self.email_result(self.username, resultstring) log(resultstring) # Kill any building jobs self._kill_all_archjobs(True) # Wake us up if the Controller thread is still running if not self._event.isSet(): self._event.set() self.endtime = time.time() self.bm.notify_job_done(self) def _kill_all_archjobs(self, user_requested=False): self._archjobs_lock.acquire() for job in self.archjobs.values(): if job: job.die(user_requested) self.archjobs = {} self._archjobs_lock.release() def wake(self): self._event.set() def process(self): if self.is_done(): return if self._die: self._handle_death() return try: func = getattr(self, "_stage_%s" % self.curstage) if func(): # Wait to be woken up when long-running operations complete while not self._event.isSet(): self._event.wait() self._event.clear() except PrepError, e: if self.use_cvs == True: shutil.rmtree(self.checkout_tmpdir, ignore_errors=True) subj = 'Prep Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str) self.email_result(self.username, resultstring=e.args, subject=subj) self._stage_failed(e.args) except BuildError, e: subj = 'Build Error (Job %s): %s on %s' % (self.uid, self._source, self._target_str) base_url = self._server_cfg.get_str("UI", "log_url") log_url = make_job_log_url(base_url, self._target_str, self.uid, self.name, self.ver, self.release) msg = "%s\n\n Build logs may be found at %s\n\n" % (e.msg, log_url) logtail = self._get_log_tail(e.arch) msg = "%s\n-------------------------------------------------\n\n%s\n" % (msg, logtail) self.email_result(self.username, resultstring=msg, subject=subj) # kbs - disabled, since @ CentOS everything is a testing build # Only non-testing targets kill remaining jobs and file the entire # build when one archjob fails #if self._target_cfg.testing() == False: # Kill remaining jobs on other arches # self._kill_all_archjobs(False) # self._stage_failed(e.msg) def _stage_building(self): # Count failed and completed jobs completed_jobs = 0 failed_jobs = 0 self._archjobs_lock.acquire() for job in self.archjobs.values(): if not job: continue if job.get_status() is 'done': completed_jobs = completed_jobs + 1 if job.builder_failed() or job.download_failed() or job.internal_failure(): failed_jobs = failed_jobs + 1 # Normal jobs will just stop when a single archjob fails, but # testing targets don't kill the build when one fails. However, # even for testing targets, we still want to notify the user if # a particular arch fails. if not job.failure_noticed(): job.set_failure_noticed() jobarch = job.arch() msg = "Job failed." if job.builder_failed(): msg = "Job failed on arch %s\n" % jobarch elif job.download_failed(): msg = "Job failed on arch %s: couldn't download result files from builder '%s'.\n " \ "Please contact the build system administrator." % (jobarch, job.builder.address()) elif job.internal_failure(): msg = "Job failed on arch %s: there was an internal build system failure.\n " \ "Please contact the build system administrator." % jobarch self._archjobs_lock.release() raise BuildError(msg, jobarch) self._archjobs_lock.release() if completed_jobs == len(self.archjobs): # Testing targets don't contribute packages to the repo if self._target_cfg.testing() == True: if failed_jobs > 0: self._stage_failed("Job failed on one or more architectures.") else: self._set_cur_stage('repodone') else: self._set_cur_stage('add_to_repo') return False # Don't want to wait return True def get_stage_dir(self): return self.result_dir def _stage_failed(self, msg=None): self.result = 'failed' self._set_cur_stage('failed', msg) self.endtime = time.time() self._cleanup_job_files() self.bm.notify_job_done(self) def _cleanup_job_files(self): if not self.result_dir or not self.srpm_http_path: return # If its a testing target, we keep the RPMs around since they don't # get copied to the repository, they only live in the repodir if self.result == 'success' and self._target_cfg.testing() == True: return srpm_file = os.path.join(self.result_dir, os.path.basename(self.srpm_http_path)) # Delete any RPMs in the arch dirs for job in self.archjobs.values(): if not job: continue for f in job.get_files(): if not f.endswith(".rpm"): continue src_file = os.path.join(self.result_dir, job.arch(), f) if src_file.endswith(".src.rpm"): # Keep an SRPM. We prefer built SRPMs from builders over # the original SRPM. if not os.path.exists(srpm_file): shutil.copy(src_file, srpm_file) os.remove(src_file) # If there were no builder-built SRPMs, keep the original around if not os.path.exists(srpm_file): shutil.copy(self.srpm_http_path, srpm_file) # Delete the SRPM in the server's HTTP dir shutil.rmtree(os.path.dirname(self.srpm_http_path), ignore_errors=True) def _stage_add_to_repo(self): # Create a list of files that the repo should copy to # the repo dir repo_dir = self._server_cfg.get_str("Directories", "repo_dir") for job in self.archjobs.values(): if not job: continue for f in job.get_files(): if not f.endswith(".rpm"): continue jobarch = job.arch() src_file = os.path.join(self.result_dir, jobarch, f) verrel = "%s-%s" % (self.ver, self.release) if f.endswith(".src.rpm"): dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, "SRPM") else: dst_path = os.path.join(repo_dir, self._target_str, self.name, verrel, jobarch) self.repofiles[src_file] = os.path.join(dst_path, f) self._event.clear() # Request the repo copy our files. It will get the file # list from this object directly when the copy operation # happens if len(self.repofiles): self.repo.request_copy(self) self.endtime = time.time() return True def repo_add_callback(self, success, bad_file=None): if success: self._set_cur_stage('repodone') else: self._stage_failed("Failed to copy %s to the repository directory." % bad_file) self.wake() def _stage_repodone(self): resultstring = " %s (%s): Build on target %s succeeded." % (self.uid, self.name, self._target_str) self.result = 'success' self._set_cur_stage('needsign', resultstring) self._cleanup_job_files() base_url = self._server_cfg.get_str("UI", "log_url") log_url = make_job_log_url(base_url, self._target_str, self.uid, self.name, self.ver, self.release) resultstring = resultstring + "\n Build logs may be found at %s\n" % (log_url) self.email_result(self.username, resultstring) # Notify everyone else who might want to know that the build succeeded for addr in self._server_cfg.get_list("Email", "success_emails"): self.email_result(addr, resultstring) self.bm.notify_job_done(self) def _get_log_tail(self, arch): """ Returns the last 30 lines of the most relevant log file """ pkg_dir = "%s-%s-%s-%s" % (self.uid, self.name, self.ver, self.release) work_dir = self._server_cfg.get_str("Directories", "server_work_dir") log_dir = os.path.join(work_dir, self._target_str, pkg_dir, arch) final_log = None build_log = "%s/%s" % (log_dir, "build.log") root_log = "%s/%s" % (log_dir, "root.log") job_log = "%s/%s" % (log_dir, "job.log") # Try the most relevant log file first if os.path.exists(build_log) and os.path.getsize(build_log) > 0: final_log = build_log elif os.path.exists(root_log) and os.path.getsize(root_log) > 0: final_log = root_log elif os.path.exists(job_log) and os.path.getsize(job_log) > 0: final_log = job_log if not final_log: return "" seek_pos = os.path.getsize(final_log) - 4096 f = open(final_log, "r", 4096); if seek_pos > 0: f.seek(seek_pos) try: # throw away the rest of the line t = f.next() except: pass lines = [] # Grab the last 30 lines from the file while True: try: line = f.next() except StopIteration: break lines.append(line) if len(lines) > 30: # only want last 30 lines del lines[0] f.close() return "".join(lines) def email_result(self, to, resultstring, subject=None): """send 'resultstring' to self.username""" if not subject: name = self.name if not name: name = self.package subject = 'Build Result: %d - %s on %s' % (self.uid, name, self._target_str) sender = self._server_cfg.get_str("Email", "email_from") EmailUtils.email_result(sender, to, resultstring, subject)