#!/usr/bin/python3 # -*- coding: utf-8 -*- # # Copyright 2005 Lars Wirzenius (liw@iki.fi) # Copyright © 2011-2017 Andreas Beckmann (anbe@debian.org) # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see """Distributed piuparts processing, master program Lars Wirzenius """ import fcntl import logging import os import random import sys import time from urllib.error import URLError import piupartslib.conf import piupartslib.packagesdb from piupartslib.conf import MissingSection from piupartslib.packagesdb import LogfileExists CONFIG_FILE = "/etc/piuparts/piuparts.conf" DISTRO_CONFIG_FILE = "/etc/piuparts/distros.conf" log_handler = None def setup_logging(log_level, log_file_name): logger = logging.getLogger() global log_handler logger.removeHandler(log_handler) if log_file_name: log_handler = logging.FileHandler(log_file_name) else: log_handler = logging.StreamHandler(sys.stderr) logger.addHandler(log_handler) logger.setLevel(log_level) def timestamp(): return time.strftime("[%Y-%m-%d %H:%M:%S]") class Config(piupartslib.conf.Config): def __init__(self, section="master", defaults_section=None): piupartslib.conf.Config.__init__( self, section, { "log-file": None, "master-directory": ".", "proxy": None, "mirror": None, "distro": None, "area": None, "arch": None, "upgrade-test-distros": None, "depends-sections": None, }, defaults_section=defaults_section, ) class CommandSyntaxError(Exception): def __init__(self, msg): self.args = (msg,) class ProtocolError(Exception): def __init__(self): self.args = ("EOF, missing space in long part, or other protocol error",) class Protocol: def __init__(self, myinput, output): self._input = myinput self._output = output def _readline(self): line = self._input.readline() logging.debug(">> " + line.rstrip()) return line def _writeline(self, line): logging.debug("<< " + line) self._output.write(line + "\n") self._output.flush() def _short_response(self, *words): self._writeline(" ".join(words)) def _read_long_part(self): lines = [] while True: line = self._input.readline() if not line: raise ProtocolError() if line == ".\n": break if line[0] != " ": raise ProtocolError() lines.append(line[1:]) return "".join(lines) class Master(Protocol): def __init__(self, myinput, output): Protocol.__init__(self, myinput, output) self._commands = { "section": self._switch_section, "recycle": self._recycle, "idle": self._idle, "status": self._status, "reserve": self._reserve, "unreserve": self._unreserve, "pass": self._pass, "fail": self._fail, "untestable": self._untestable, # debug commands, unstable and undocumented interface "_state": self._state, "_depends": self._depends, "_recursive-depends": self._recursive_depends, "_depcycle": self._depcycle, "_list": self._list, } self._section = None self._lock = None self._writeline("hello") def _init_section(self, section): if self._lock: self._lock.close() # clear all settings from a previous section and set defaults self._section = None self._lock = None self._recycle_mode = False self._idle_mode = False self._idle_stamp = os.path.join(section, "idle.stamp") self._package_databases = None self._binary_db = None config = Config(section=section, defaults_section="global") try: config.read(CONFIG_FILE) except MissingSection: return False if not os.path.exists(section): os.makedirs(section) self._lock = open(os.path.join(section, "master.lock"), "w") try: fcntl.flock(self._lock, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: return False self._section = section logging.debug(timestamp() + " switching logfile") logfile = config["log-file"] or os.path.join(section, "master.log") setup_logging(logging.DEBUG, logfile) logging.debug(timestamp() + " connected") # start with a dummy _binary_db (without Packages file), sufficient # for submitting finished logs self._binary_db = piupartslib.packagesdb.PackagesDB(prefix=section) return True def _init_db(self): if self._package_databases is not None: return self._package_databases = {} self._load_package_database(self._section) self._binary_db = self._package_databases[self._section] def _load_package_database(self, section): if section in self._package_databases: return config = Config(section=section, defaults_section="global") config.read(CONFIG_FILE) distro_config = piupartslib.conf.DistroConfig(DISTRO_CONFIG_FILE, config["mirror"]) db = piupartslib.packagesdb.PackagesDB(prefix=section) if self._recycle_mode and self._section == section: db.enable_recycling() self._package_databases[section] = db if config["depends-sections"]: deps = config["depends-sections"].split() for dep in deps: self._load_package_database(dep) db.set_dependency_databases([self._package_databases[dep] for dep in deps]) db.load_packages_urls( distro_config.get_packages_urls(config.get_distro(), config.get_area(), config.get_arch()) ) if config.get_distro() != config.get_final_distro(): # take version numbers (or None) from final distro db.load_alternate_versions_from_packages_urls( distro_config.get_packages_urls(config.get_final_distro(), config.get_area(), config.get_arch()) ) def _clear_idle(self): if self._idle_mode: self._idle_mode = False if os.path.exists(self._idle_stamp): os.unlink(self._idle_stamp) def _set_idle(self): if not self._idle_mode: self._idle_mode = True open(self._idle_stamp, "w").close() os.utime(self._idle_stamp, (-1, self._binary_db._stamp)) def _get_idle_status(self): """Returns number of seconds a cached idle status is still valid, or 0 if not known to be idle.""" if not os.path.exists(self._idle_stamp): return 0 stamp_mtime = os.path.getmtime(self._idle_stamp) ttl = stamp_mtime + 3600 - time.time() if ttl <= 0: return 0 # stamp expired if stamp_mtime < self._binary_db.get_mtime(): return 0 # stamp outdated return ttl + random.randrange(120) def do_transaction(self): line = self._readline() if line: parts = line.split() if len(parts) > 0: command = parts[0] args = parts[1:] if self._section is None and command != "section": raise CommandSyntaxError("Expected 'section' command, got %s" % command) if command in self._commands: self._commands[command](command, args) return True else: raise CommandSyntaxError("Unknown command %s" % command) return False def _check_args(self, count, command, args): if len(args) != count: raise CommandSyntaxError("Need exactly %d args: %s %s" % (count, command, " ".join(args))) def dump_pkgs(self): for st in self._binary_db.get_states(): for name in self._binary_db.get_pkg_names_in_state(st): logging.debug("%s : %s\n" % (st, name)) def _switch_section(self, command, args): self._check_args(1, command, args) if self._init_section(args[0]): self._short_response("ok") elif self._lock is None: # unknown section self._short_response("error") else: self._short_response("busy") def _recycle(self, command, args): self._check_args(0, command, args) if self._binary_db.enable_recycling(): self._idle_stamp = os.path.join(self._section, "recycle.stamp") self._recycle_mode = True self._short_response("ok") else: self._short_response("error") def _idle(self, command, args): self._check_args(0, command, args) self._short_response("ok", "%d" % self._get_idle_status()) def _status(self, command, args): self._check_args(0, command, args) self._init_db() stats = "" if self._binary_db._recycle_mode: stats += "(recycle) " total = 0 for state in self._binary_db.get_active_states(): count = len(self._binary_db.get_pkg_names_in_state(state)) total += count stats += "%s=%d " % (state, count) stats += "total=%d" % total self._short_response("ok", stats) def _reserve(self, command, args): self._check_args(0, command, args) self._init_db() package = self._binary_db.reserve_package() if package is None: self._set_idle() self._short_response("error") else: self._clear_idle() self._short_response("ok", package.name(), package.test_versions()) def _unreserve(self, command, args): self._check_args(2, command, args) self._binary_db.unreserve_package(args[0], args[1]) self._short_response("ok") def _pass(self, command, args): self._check_args(2, command, args) log = self._read_long_part() try: self._binary_db.pass_package(args[0], args[1], log) except LogfileExists: logging.info("Ignoring duplicate submission: %s %s %s" % ("pass", args[0], args[1])) self._short_response("ok") def _fail(self, command, args): self._check_args(2, command, args) log = self._read_long_part() try: self._binary_db.fail_package(args[0], args[1], log) except LogfileExists: logging.info("Ignoring duplicate submission: %s %s %s" % ("fail", args[0], args[1])) self._short_response("ok") def _untestable(self, command, args): self._check_args(2, command, args) log = self._read_long_part() try: self._binary_db.make_package_untestable(args[0], args[1], log) except LogfileExists: logging.info("Ignoring duplicate submission: %s %s %s" % ("untestable", args[0], args[1])) self._short_response("ok") # debug command def _state(self, command, args): self._check_args(1, command, args) self._short_response( "ok", self._binary_db.get_package_state(args[0]), self._binary_db.get_test_versions(args[0]), ) # debug command def _depends(self, command, args): self._check_args(1, command, args) if self._binary_db.has_package(args[0]): package = self._binary_db.get_package(args[0]) self._short_response("ok", *package.dependencies()) else: self._short_response("error") # debug command def _recursive_depends(self, command, args): self._check_args(1, command, args) if self._binary_db.has_package(args[0]): package = self._binary_db.get_package(args[0]) self._short_response("ok", *self._binary_db._get_recursive_dependencies(package)) else: self._short_response("error") # debug command def _depcycle(self, command, args): self._check_args(1, command, args) if self._binary_db.has_package(args[0]): self._short_response("ok", *self._binary_db._get_dependency_cycle(args[0])) else: self._short_response("error") # debug command def _list(self, command, args): self._check_args(1, command, args) if args[0] in self._binary_db.get_states(): self._short_response("ok", *self._binary_db.get_pkg_names_in_state(args[0])) else: self._short_response("error") def main(): setup_logging(logging.INFO, None) global_config = Config(section="global") global_config.read(CONFIG_FILE) if global_config["proxy"]: os.environ["http_proxy"] = global_config["proxy"] master_directory = global_config["master-directory"] if not os.path.exists(master_directory): os.makedirs(master_directory) os.chdir(master_directory) m = Master(sys.stdin, sys.stdout) try: while m.do_transaction(): pass except URLError as e: logging.error("ABORT: URLError: " + str(e.reason)) except BrokenPipeError: logging.error("ABORT: BrokenPipeError") logging.debug(timestamp() + " disconnected") # https://docs.python.org/3/library/signal.html#note-on-sigpipe devnull = os.open(os.devnull, os.O_WRONLY) os.dup2(devnull, sys.stdout.fileno()) sys.exit(1) logging.debug(timestamp() + " disconnected") if __name__ == "__main__": main() # vi:set et ts=4 sw=4 :