#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright 2005 Lars Wirzenius (liw@iki.fi) # Copyright © 2011-2019 Andreas Beckmann (anbe@debian.org) # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see """Distributed piuparts processing, slave program Lars Wirzenius """ from __future__ import print_function import fcntl import logging import os import random import shlex import subprocess import sys import time from signal import SIGALRM, SIGHUP, SIGINT, SIGKILL, SIGUSR1, alarm, signal import apt_pkg import piupartslib.conf import piupartslib.packagesdb from piupartslib.conf import MissingSection apt_pkg.init_system() CONFIG_FILE = "/etc/piuparts/piuparts.conf" DISTRO_CONFIG_FILE = "/etc/piuparts/distros.conf" MAX_WAIT_TEST_RUN = 90 * 60 interrupted = False old_sigint_handler = None got_sighup = False got_sigusr1 = False def setup_logging(log_level, log_file_name): logger = logging.getLogger() logger.setLevel(log_level) formatter = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt="%H:%M:%S") handler = logging.StreamHandler(sys.stderr) handler.setFormatter(formatter) logger.addHandler(handler) if log_file_name: handler = logging.FileHandler(log_file_name) logger.addHandler(handler) class Config(piupartslib.conf.Config): def __init__(self, section="slave", defaults_section=None): self.section = section piupartslib.conf.Config.__init__( self, section, { "sections": "slave", "basetgz-sections": "", "idle-sleep": 300, "max-tgz-age": 2592000, "min-tgz-retry-delay": 21600, "master-host": None, "master-user": None, "master-command": None, "proxy": None, "mirror": None, "setarch": None, "piuparts-command": "sudo piuparts", "piuparts-flags": "", "tmpdir": None, "distro": None, "area": None, "components": None, "chroot-tgz": None, "upgrade-test-distros": None, "basetgz-directory": ".", "chroot-meta-auto": None, "chroot-meta-directory": None, "max-reserved": 1, "debug": "no", "keep-sources-list": "no", "arch": None, "precedence": "1", "slave-load-max": None, "slave-flush-interval": 0, }, defaults_section=defaults_section, ) class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm def sigint_handler(signum, frame): global interrupted interrupted = True print("\nSlave interrupted by the user, waiting for the current test to finish.") print("Press Ctrl-C again to abort now.") signal(SIGINT, old_sigint_handler) def sighup_handler(signum, frame): global got_sighup got_sighup = True print("SIGHUP: Will flush finished logs.") def sigusr1_handler(signum, frame): global got_sigusr1 global got_sighup got_sigusr1 = True got_sighup = True print("SIGUSR1: Will restart.") class MasterIsBusy(Exception): def __init__(self): self.args = ("Master is busy, retry later",) class MasterNotOK(Exception): def __init__(self): self.args = ("Master did not respond with 'ok'",) class MasterDidNotGreet(Exception): def __init__(self): self.args = ("Master did not start with 'hello'",) class MasterCommunicationFailed(Exception): def __init__(self): self.args = ("Communication with master failed",) class MasterIsCrazy(Exception): def __init__(self): self.args = ("Master said something unexpected",) class MasterCantRecycle(Exception): def __init__(self): self.args = ("Master has nothing to recycle",) class Slave: def __init__(self): self._to_master = None self._from_master = None self._master_host = None self._master_user = None self._master_command = None self._section = None def _readline(self): try: line = self._from_master.readline() except IOError: raise MasterCommunicationFailed() logging.debug("<< " + str(line.rstrip())) return line def _writeline(self, *words): line = " ".join(words) logging.debug(">> " + line) try: self._to_master.write(line + "\n") self._to_master.flush() except IOError: raise MasterCommunicationFailed() def set_master_host(self, host): logging.debug("Setting master host to %s" % host) if self._master_host != host: self.close() self._master_host = host def set_master_user(self, user): logging.debug("Setting master user to %s" % user) if self._master_user != user: self.close() self._master_user = user def set_master_command(self, cmd): logging.debug("Setting master command to %s" % cmd) if self._master_command != cmd: self.close() self._master_command = cmd def set_section(self, section): logging.debug("Setting section to %s" % section) self._section = section def connect_to_master(self): if not self._is_connected(): self._initial_connect() self._select_section() def _is_connected(self): return self._to_master and self._from_master def _initial_connect(self): logging.info("Connecting to %s" % self._master_host) ssh_command = ["ssh", "-x"] if self._master_user: ssh_command.extend(["-l", self._master_user]) ssh_command.append(self._master_host) ssh_command.append(self._master_command or "command-is-set-in-authorized_keys") p = subprocess.Popen( ssh_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True, ) self._to_master = p.stdin self._from_master = p.stdout line = self._readline() if line != "hello\n": raise MasterDidNotGreet() def _select_section(self): self._writeline("section", self._section) line = self._readline() if line == "busy\n": raise MasterIsBusy() elif line != "ok\n": raise MasterNotOK() logging.debug("Connected to master") def close(self): if self._from_master is None and self._to_master is None: return logging.debug("Closing connection to master") if self._from_master is not None: self._from_master.close() if self._to_master is not None: self._to_master.close() self._from_master = self._to_master = None logging.info("Connection to master closed") def send_log(self, section, pass_or_fail, filename): logging.info("Sending log file %s/%s" % (section, filename)) basename = os.path.basename(filename) package, rest = basename.split("_", 1) version = rest[: -len(".log")] self._writeline(pass_or_fail, package, version) with open(filename, "r") as f: for line in f: if line.endswith("\n"): line = line[:-1] self._writeline(" " + line) self._writeline(".") line = self._readline() if line != "ok\n": raise MasterNotOK() def get_status(self, section): self._writeline("status") line = self._readline() words = line.split() if words and words[0] == "ok": logging.info("Master " + section + " status: " + " ".join(words[1:])) else: raise MasterIsCrazy() def enable_recycling(self): self._writeline("recycle") line = self._readline() if line != "ok\n": raise MasterCantRecycle() def get_idle(self): self._writeline("idle") line = self._readline() words = line.split() if words and words[0] == "ok" and len(words) == 2: return int(words[1]) else: raise MasterIsCrazy() def reserve(self): self._writeline("reserve") line = self._readline() words = line.split() if words and words[0] == "ok": logging.info("Reserved for us: %s %s" % (words[1], words[2])) self.remember_reservation(words[1], words[2]) return True elif words and words[0] == "error": logging.info("Master didn't reserve anything (more) for us") return False else: raise MasterIsCrazy() def unreserve(self, filename): basename = os.path.basename(filename) package, rest = basename.split("_", 1) version = rest[: -len(".log")] logging.info("Unreserve: %s %s" % (package, version)) self._writeline("unreserve", package, version) line = self._readline() if line != "ok\n": raise MasterNotOK() def _reserved_filename(self, name, version): return os.path.join("reserved", "%s_%s.log" % (name, version)) def remember_reservation(self, name, version): create_file(self._reserved_filename(name, version), "") def get_reserved(self): vlist = [] for basename in os.listdir("reserved"): if "_" in basename and basename.endswith(".log"): name, version = basename[: -len(".log")].split("_", 1) vlist.append((name, version)) return vlist def forget_reserved(self, name, version): try: os.remove(self._reserved_filename(name, version)) except os.error: pass class Section: def __init__(self, section, slave=None): self._config = Config(section=section, defaults_section="global") self._config.read(CONFIG_FILE) self._distro_config = piupartslib.conf.DistroConfig(DISTRO_CONFIG_FILE, self._config["mirror"]) self._error_wait_until = 0 self._idle_wait_until = 0 self._recycle_wait_until = 0 self._tarball_wait_until = 0 self._slave_directory = os.path.abspath(section) if not os.path.exists(self._slave_directory): os.makedirs(self._slave_directory) if self._config["debug"] in ["yes", "true"]: self._logger = logging.getLogger() self._logger.setLevel(logging.DEBUG) self._slave = slave or Slave() for rdir in ["new", "pass", "fail", "untestable", "reserved"]: rdir = os.path.join(self._slave_directory, rdir) if not os.path.exists(rdir): os.mkdir(rdir) if int(self._config["max-reserved"]) > 0: self._check_tarball() def _throttle_if_overloaded(self): global interrupted if interrupted or got_sighup: return if self._config["slave-load-max"] is None: return load_max = float(self._config["slave-load-max"]) if load_max < 1.0: return if os.getloadavg()[0] <= load_max: return load_resume = max(load_max - 1.0, 0.9) secs = random.randrange(30, 90) self._slave.close() while True: load = os.getloadavg()[0] if load <= load_resume: break logging.info("Sleeping due to high load (%.2f)" % load) try: time.sleep(secs) except KeyboardInterrupt: interrupted = True if interrupted or got_sighup: break if secs < 300: secs += random.randrange(30, 90) def _connect_to_master(self, recycle=False): self._slave.set_master_host(self._config["master-host"]) self._slave.set_master_user(self._config["master-user"]) self._slave.set_master_command(self._config["master-command"]) self._slave.set_section(self._config.section) self._slave.connect_to_master() if recycle: self._slave.enable_recycling() def _get_tarball(self): basetgz = self._config["chroot-tgz"] or self._distro_config.get_basetgz( self._config.get_start_distro(), self._config.get_arch(), merged_usr="--no-merged-usr" not in self._config["piuparts-flags"], ) return os.path.join(self._config["basetgz-directory"], basetgz) def _check_tarball(self): if int(self._config["max-tgz-age"]) < 0: return oldcwd = os.getcwd() os.chdir(self._slave_directory) tgz = self._get_tarball() max_tgz_age = int(self._config["max-tgz-age"]) min_tgz_retry_delay = int(self._config["min-tgz-retry-delay"]) ttl = 0 if max_tgz_age == 0: ttl = 86400 needs_update = not os.path.exists(tgz) if not needs_update and max_tgz_age > 0: # tgz exists and age is limited, so check age now = time.time() age = now - os.path.getmtime(tgz) ttl = max_tgz_age - age logging.info("Check-replace %s: age=%d vs. max=%d" % (tgz, age, max_tgz_age)) if ttl < 0: if os.path.exists(tgz + ".log"): age = now - os.path.getmtime(tgz + ".log") ttl = min_tgz_retry_delay - age logging.info("Limit-replace %s: last-retry=%d vs. min=%d" % (tgz, age, min_tgz_retry_delay)) if ttl < 0: needs_update = True logging.info("%s too old. Forcing re-creation" % tgz) if needs_update: self._slave.close() create_chroot(self._config, tgz, self._config.get_start_distro()) ttl = min_tgz_retry_delay self._tarball_wait_until = time.time() + ttl os.chdir(oldcwd) def _get_refchroot_metadata(self): if self._config["chroot-meta-auto"]: if self._config["chroot-meta-directory"]: path = os.path.join(self._config["chroot-meta-directory"], self._config.section) if not os.path.exists(path): os.makedirs(path) return os.path.join(path, self._config["chroot-meta-auto"]) return self._config["chroot-meta-auto"] return None def _check_refchroot_metadata(self): refchroot_metadata = self._get_refchroot_metadata() if refchroot_metadata: if os.path.exists(refchroot_metadata): try: age = time.time() - os.path.getmtime(refchroot_metadata) if age > 6 * 3600: os.unlink(refchroot_metadata) logging.info("Deleting old %s" % refchroot_metadata) except OSError: pass def _count_submittable_logs(self): files = 0 subdirs = ["pass", "fail", "untestable"] if interrupted: subdirs += ["reserved", "new"] for logdir in subdirs: for basename in os.listdir(os.path.join(self._slave_directory, logdir)): if basename.endswith(".log"): files += 1 return files def precedence(self): return int(self._config["precedence"]) def sleep_until(self, recycle=False): if recycle: return max(self._error_wait_until, self._recycle_wait_until) return max(self._error_wait_until, self._idle_wait_until) def run(self, do_processing=True, recycle=False): if time.time() < self.sleep_until(recycle=recycle): return 0 self._throttle_if_overloaded() self._config = Config(section=self._config.section, defaults_section="global") try: self._config.read(CONFIG_FILE) except MissingSection: logging.info("unknown section " + self._config.section) self._error_wait_until = time.time() + 3600 return 0 self._distro_config = piupartslib.conf.DistroConfig(DISTRO_CONFIG_FILE, self._config["mirror"]) if interrupted or got_sighup: do_processing = False if do_processing and time.time() > self._tarball_wait_until: self._check_tarball() if self._config.get_distro() == "None": # section is for tarball creation only self._idle_wait_until = self._tarball_wait_until + 60 self._recycle_wait_until = self._tarball_wait_until + 3600 return 0 if interrupted or got_sighup: do_processing = False if not do_processing and self._count_submittable_logs() == 0: return 0 logging.info("-------------------------------------------") action = "Running" if recycle: action = "Recycling" if not do_processing: action = "Flushing" logging.info("%s section %s (precedence=%d)" % (action, self._config.section, self.precedence())) if int(self._config["max-reserved"]) == 0: logging.info("disabled") self._error_wait_until = time.time() + 12 * 3600 return 0 if not self._config.get_distro() and not self._config.get_distros(): logging.error("neither 'distro' nor 'upgrade-test-distros' configured") self._error_wait_until = time.time() + 3600 return 0 with open(os.path.join(self._slave_directory, "slave.lock"), "w") as lock: oldcwd = os.getcwd() os.chdir(self._slave_directory) try: fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: logging.info("busy") self._error_wait_until = time.time() + 900 else: if do_processing: self._check_refchroot_metadata() if self._talk_to_master(fetch=do_processing, recycle=recycle, unreserve=interrupted): if do_processing: if not self._slave.get_reserved(): self._idle_wait_until = time.time() + int(self._config["idle-sleep"]) if recycle: self._recycle_wait_until = self._idle_wait_until + 3600 if do_processing and self._slave.get_reserved(): processed = self._process() if got_sighup and self._slave.get_reserved(): # keep this section at the front of the round-robin runnable queue self._idle_wait_until = 0 self._recycle_wait_until = 0 else: # put this section at the end of the round-robin runnable queue self._idle_wait_until = time.time() self._recycle_wait_until = time.time() return processed finally: os.chdir(oldcwd) return 0 def _talk_to_master(self, fetch=False, unreserve=False, recycle=False): flush = self._count_submittable_logs() > 0 fetch = fetch and not self._slave.get_reserved() if not flush and not fetch: return True try: self._connect_to_master(recycle=recycle) except KeyboardInterrupt: raise except MasterIsBusy: logging.error("master is busy") self._error_wait_until = time.time() + random.randrange(60, 180) except MasterCantRecycle: logging.error("master has nothing to recycle") self._recycle_wait_until = max(time.time(), self._idle_wait_until) + 3600 except ( MasterDidNotGreet, MasterIsCrazy, MasterCommunicationFailed, MasterNotOK, ): logging.error("connection to master failed") self._error_wait_until = time.time() + 900 self._slave.close() else: try: for logdir in ["pass", "fail", "untestable"]: for basename in os.listdir(logdir): if basename.endswith(".log"): fullname = os.path.join(logdir, basename) self._slave.send_log(self._config.section, logdir, fullname) os.remove(fullname) if unreserve: for logdir in ["new", "reserved"]: for basename in os.listdir(logdir): if basename.endswith(".log"): fullname = os.path.join(logdir, basename) self._slave.unreserve(fullname) os.remove(fullname) if fetch: max_reserved = int(self._config["max-reserved"]) idle = self._slave.get_idle() if idle > 0: idle = min(idle, int(self._config["idle-sleep"])) logging.info("idle (%d)" % idle) if not recycle: self._idle_wait_until = time.time() + idle else: self._recycle_wait_until = time.time() + idle return 0 while len(self._slave.get_reserved()) < max_reserved and self._slave.reserve(): pass self._slave.get_status(self._config.section) except MasterNotOK: logging.error("master did not respond with 'ok'") self._error_wait_until = time.time() + 900 self._slave.close() except (MasterIsCrazy, MasterCommunicationFailed): logging.error("communication with master failed") self._error_wait_until = time.time() + 900 self._slave.close() else: return True return False def _process(self): global interrupted last_flush = time.time() packagenames = set([x[0] for x in self._slave.get_reserved()]) packages_files = {} for distro in [self._config.get_distro()] + self._config.get_distros(): if distro not in packages_files: try: pf = piupartslib.packagesdb.PackagesFile() pf.load_packages_urls( self._distro_config.get_packages_urls(distro, self._config.get_area(), self._config.get_arch()), packagenames, ) packages_files[distro] = pf except IOError: logging.error("failed to fetch packages file for %s" % distro) self._error_wait_until = time.time() + 900 return 0 except KeyboardInterrupt: interrupted = True del packagenames test_count = 0 self._check_tarball() if not os.path.exists(self._get_tarball()): self._error_wait_until = time.time() + 300 self._check_refchroot_metadata() for package_name, version in self._slave.get_reserved(): self._throttle_if_overloaded() if interrupted or got_sighup: break if int(self._config["slave-flush-interval"]): if time.time() - last_flush > int(self._config["slave-flush-interval"]): last_flush += 300 # throttle retries if self._talk_to_master(): last_flush = time.time() if not os.path.exists(self._get_tarball()): logging.error("Missing chroot-tgz %s" % self._get_tarball()) break test_count += 1 self._test_package(package_name, version, packages_files) self._slave.forget_reserved(package_name, version) self._talk_to_master(unreserve=interrupted) return test_count def _test_package(self, pname, pvers, packages_files): global old_sigint_handler old_sigint_handler = signal(SIGINT, sigint_handler) self._slave.close() logging.info("Testing package %s/%s %s" % (self._config.section, pname, pvers)) output_name = log_name(pname, pvers) logging.debug("Opening log file %s" % output_name) new_name = os.path.join("new", output_name) output = open(new_name, "w") output.write(time.strftime("Start: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime())) distupgrade = len(self._config.get_distros()) > 1 command = [] if self._config["setarch"]: command.append("setarch") command.extend(self._config["setarch"].split()) command.extend(self._config["piuparts-command"].split()) if self._config["piuparts-flags"]: command.extend(self._config["piuparts-flags"].split()) if "http_proxy" in os.environ: command.extend(["--proxy", os.environ["http_proxy"]]) if self._config["mirror"]: mirror = self._config["mirror"] if self._config["components"]: mirror += " " + self._config["components"] command.extend(["--mirror", mirror]) if self._config["tmpdir"]: command.extend(["--tmpdir", self._config["tmpdir"]]) command.extend(["--arch", self._config.get_arch()]) command.extend(["-b", self._get_tarball()]) if not distupgrade: command.extend(["-d", self._config.get_distro()]) command.append("--no-upgrade-test") else: for distro in self._config.get_distros(): command.extend(["-d", distro]) if self._config["keep-sources-list"] in ["yes", "true"]: command.append("--keep-sources-list") if distupgrade and self._config["chroot-meta-auto"]: refchroot_metadata = self._get_refchroot_metadata() if not os.path.exists(refchroot_metadata): command.extend(["-S", refchroot_metadata]) else: command.extend(["-B", refchroot_metadata]) command.extend(["--apt", "%s=%s" % (pname, pvers)]) subdir = "fail" ret = 0 if not distupgrade: distro = self._config.get_distro() if pname not in packages_files[distro]: output.write("Package %s found not in %s\n" % (pname, distro)) ret = -10001 else: package = packages_files[distro][pname] if pvers != package["Version"]: output.write( "Package %s %s not found in %s, %s is available\n" % (pname, pvers, distro, package["Version"]) ) ret = -10002 output.write("\n") package.dump(output) output.write("\n") else: distros = self._config.get_distros() if distros: # the package must exist somewhere for distro in distros: if pname in packages_files[distro]: break else: output.write("Package %s not found in any distribution\n" % pname) ret = -10003 # the package must have the correct version in the distupgrade target distro distro = distros[-1] if pname not in packages_files[distro]: # the package may "disappear" in the distupgrade target distro if pvers == "None": pass else: output.write("Package %s not found in %s\n" % (pname, distro)) ret = -10004 else: package = packages_files[distro][pname] if pvers != package["Version"]: output.write( "Package %s %s not found in %s, %s is available\n" % (pname, pvers, distro, package["Version"]) ) ret = -10005 for distro in distros: output.write("\n[%s]\n" % distro) if pname in packages_files[distro]: packages_files[distro][pname].dump(output) output.write("\n") if ret == 0: prev = "~" for distro in distros: if pname in packages_files[distro]: v = packages_files[distro][pname]["Version"] if not apt_pkg.version_compare(prev, v) <= 0: output.write("Upgrade to %s requires downgrade: %s > %s\n" % (distro, prev, v)) ret = -10006 prev = v else: ret = -10010 if ret != 0: subdir = "untestable" if ret == 0: output.write("Executing: %s\n" % command2string(command)) ret, f = run_test_with_timeout(command, MAX_WAIT_TEST_RUN) if not f or f[-1] != "\n": f += "\n" output.write(f.replace("\033", "[ESC]")) lastline = f.split("\n")[-2] if ret < 0: output.write(" *** Process KILLED - exceed maximum run time ***\n") elif "piuparts run ends" not in lastline: ret += 1024 output.write(" *** PIUPARTS OUTPUT INCOMPLETE ***\n") elif distupgrade and self._config["chroot-meta-auto"]: try: refchroot_metadata = self._get_refchroot_metadata() if "History of available packages does not match - reference chroot may be outdated" in f: os.unlink(refchroot_metadata) logging.info("Deleting outdated %s" % refchroot_metadata) elif "Initial package selections do not match - ignoring loaded reference chroot state" in f: os.unlink(refchroot_metadata) logging.info("Deleting mismatching %s" % refchroot_metadata) except OSError: pass output.write("\n") output.write("ret=%d\n" % ret) output.write(time.strftime("End: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime())) output.close() if ret == 0: subdir = "pass" os.rename(new_name, os.path.join(subdir, output_name)) logging.debug("Done with %s: %s (%d)" % (output_name, subdir, ret)) signal(SIGINT, old_sigint_handler) def log_name(package, version): return "%s_%s.log" % (package, version) def command2string(command): """Quote s.t. copy+paste from the logfile gives a runnable command in the shell.""" return " ".join([shlex.quote(arg) for arg in command]) def run_test_with_timeout(cmd, maxwait, kill_all=True): def terminate_subprocess(p, kill_all): pids = [p.pid] if kill_all: ps = subprocess.Popen( ["ps", "--no-headers", "-o", "pid", "--ppid", "%d" % p.pid], stdout=subprocess.PIPE, universal_newlines=True, ) stdout, stderr = ps.communicate() pids.extend([int(pid) for pid in stdout.split()]) if p.poll() is None: print("Sending SIGINT...") try: os.killpg(os.getpgid(p.pid), SIGINT) except OSError: pass # piuparts has 30 seconds to clean up after Ctrl-C for i in range(60): time.sleep(0.5) if p.poll() is not None: break if p.poll() is None: print("Sending SIGTERM...") p.terminate() # piuparts has 5 seconds to clean up after SIGTERM for i in range(10): time.sleep(0.5) if p.poll() is not None: break if p.poll() is None: print("Sending SIGKILL...") p.kill() for pid in pids: if pid > 0: try: os.kill(pid, SIGKILL) print("Killed %d" % pid) except OSError: pass logging.debug("Executing: %s" % command2string(cmd)) stdout = "" p = subprocess.Popen( cmd, preexec_fn=os.setpgrp, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if maxwait > 0: signal(SIGALRM, alarm_handler) alarm(maxwait) try: stdout, stderr = p.communicate() alarm(0) except Alarm: terminate_subprocess(p, kill_all) return -1, stdout except KeyboardInterrupt: print("\nSlave interrupted by the user, cleaning up...") try: terminate_subprocess(p, kill_all) except KeyboardInterrupt: print("\nTerminating piuparts was interrupted... manual cleanup still neccessary.") raise raise ret = p.returncode if ret in [124, 137]: # process was terminated by the timeout command ret = -ret return ret, stdout def create_chroot(config, tarball, distro): command = [] if config["setarch"]: command.append("setarch") command.extend(config["setarch"].split()) command.extend(config["piuparts-command"].split()) if config["piuparts-flags"]: command.extend(config["piuparts-flags"].split()) if "http_proxy" in os.environ: command.extend(["--proxy", os.environ["http_proxy"]]) if config["mirror"]: mirror = config["mirror"] if config["components"]: mirror += " " + config["components"] command.extend(["--mirror", mirror]) if config["tmpdir"]: command.extend(["--tmpdir", config["tmpdir"]]) command.extend(["--arch", config.get_arch()]) command.extend(["-d", distro]) command.extend(["-s", tarball + ".new"]) command.extend(["--no-install-purge-test", "--no-upgrade-test"]) command.extend(["--apt", "TARBALL"]) # dummy package name output_name = tarball + ".log" with open(output_name, "w") as output: try: fcntl.flock(output, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: logging.info("Creation of tarball %s already in progress." % tarball) else: logging.info("Creating new tarball %s" % tarball) output.write(time.strftime("Start: %Y-%m-%d %H:%M:%S %Z\n\n", time.gmtime())) output.write("Executing: " + command2string(command) + "\n\n") logging.debug("Executing: " + command2string(command)) try: p = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) for line in p.stdout: output.write(line) logging.debug(">> " + line.rstrip()) p.wait() output.write(time.strftime("\nEnd: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime())) if os.path.exists(tarball + ".new"): os.rename(tarball + ".new", tarball) else: logging.error("Tarball creation failed, see %s" % output_name) except IOError: output.write(time.strftime("\nFAIL: %Y-%m-%d %H:%M:%S %Z\n", time.gmtime())) logging.error("Tarball creation failed with IOError") def create_file(filename, contents): with open(filename, "w") as f: f.write(contents) def main(): setup_logging(logging.INFO, None) signal(SIGHUP, sighup_handler) signal(SIGUSR1, sigusr1_handler) # For supporting multiple architectures and suites, we take command-line # argument(s) referring to section(s) in the configuration file. # If no argument is given, the "sections" entry from the "global" section # is used. section_names = [] global_config = Config(section="global") global_config.read(CONFIG_FILE) if global_config["proxy"]: os.environ["http_proxy"] = global_config["proxy"] if len(sys.argv) > 1: section_names = sys.argv[1:] else: section_names = global_config["sections"].split() section_names += global_config["basetgz-sections"].split() persistent_connection = Slave() sections = [] for section_name in section_names: try: sections.append(Section(section_name, persistent_connection)) except MissingSection: # ignore unknown sections pass if not sections: logging.error("no sections found") return # flush logs from previous run for section in sections: section.run(do_processing=False) while True: global got_sighup test_count = 0 for section in sorted(sections, key=lambda section: (section.precedence(), section.sleep_until())): test_count += section.run(do_processing=(test_count == 0)) if got_sigusr1: logging.info("Restarting...") os.execv(__file__, sys.argv) if test_count == 0 and got_sighup: # clear SIGHUP state after flushing all sections got_sighup = False continue if test_count == 0: # try to recycle old logs # round robin recycling of all sections is ensured by the recycle_wait_until timestamps idle_until = min([section.sleep_until() for section in sections]) for section in sorted(sections, key=lambda section: section.sleep_until(recycle=True)): test_count += section.run(recycle=True) if test_count > 0 and idle_until < time.time(): break if interrupted: raise KeyboardInterrupt if test_count == 0 and not got_sighup: now = time.time() sleep_until = min( [now + int(global_config["idle-sleep"])] + [section.sleep_until() for section in sections] ) if sleep_until > now: to_sleep = max(60, sleep_until - now) persistent_connection.close() logging.info("Nothing to do, sleeping for %d seconds." % to_sleep) time.sleep(to_sleep) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("") print("Slave interrupted by the user, exiting...") sys.exit(1) # vi:set et ts=4 sw=4 :