# -*- coding: utf-8 -*- # Copyright 2013 David Steele (dsteele@gmail.com) # Copyright © 2014 Andreas Beckmann (anbe@debian.org) # # This file is part of Piuparts # # Piuparts is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # Piuparts is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, see . # dwke.py is used by master-bin/detect_well_known_errors.py import logging import os import re from collections import namedtuple KPR_EXT = ".kpr" BUG_EXT = ".bug" LOG_EXT = ".log" class Problem: """Encapsulate a particular known problem""" required_tags = ["PATTERN", "WHERE", "ISSUE", "HEADER", "HELPTEXT"] optional_tags = ["EXCLUDE_PATTERN", "EXPLAIN", "PRIORITY"] def __init__(self, probpath): """probpath is the path to the problem definition file""" self.probpath = probpath self.name = os.path.basename(probpath) self.short_name = os.path.splitext(self.name)[0] self.tags_are_valid = True self.init_problem() for tag in self.required_tags: if tag not in self.__dict__: self.tags_are_valid = False if "PATTERN" in self.__dict__: self.inc_re = re.compile(self.PATTERN) else: self.inc_re = None if "EXCLUDE_PATTERN" in self.__dict__: self.exc_re = re.compile(self.EXCLUDE_PATTERN) else: self.exc_re = None def valid(self): return self.tags_are_valid def init_problem(self): """Load problem file parameters (HELPTEXT="foo" -> self.HELPTEXT)""" with open(self.probpath, "r") as pb: probbody = pb.read() tagged = re.sub("^([A-Z_]+=)", r"\g<0>", probbody, 0, re.MULTILINE) for chub in re.split("", tagged)[1:]: (name, value) = re.split("=", chub, 1, re.MULTILINE) while value[-1] == "\n": value = value[:-1] if re.search("^'.+'$", value, re.MULTILINE | re.DOTALL) or re.search( '^".+"$', value, re.MULTILINE | re.DOTALL ): value = value[1:-1] if name in self.required_tags or name in self.optional_tags: self.__dict__[name] = value else: self.tags_are_valid = False self.WHERE = self.WHERE.split(" ") def has_problem(self, logbody, where): """Does the log text 'logbody' contain this known problem?""" if where in self.WHERE: if self.inc_re.search(logbody, re.MULTILINE): for line in logbody.splitlines(): if self.inc_re.search(line): if self.exc_re is None or not self.exc_re.search(line): return True return False def get_command(self): cmd = 'grep -E "%s"' % self.PATTERN if "EXCLUDE_PATTERN" in self.__dict__: cmd += ' | grep -v -E "%s"' % self.EXCLUDE_PATTERN return cmd class FailureManager: """Class to track known failures encountered, by package, where (e.g. 'fail'), and known problem type""" def __init__(self, logdict): """logdict is {pkgspec: fulllogpath} across all log files""" self.logdict = logdict self.failures = [] self.load_failures() def load_failures(self): """Collect failures across all kpr files, as named tuples""" for pkgspec in self.logdict: logpath = self.logdict[pkgspec] try: with open(get_kpr_path(logpath), "r") as kp: for line in kp: (where, problem) = self.parse_kpr_line(line) self.failures.append(make_failure(where, problem, pkgspec)) except IOError: logging.error("Error processing %s" % get_kpr_path(logpath)) def parse_kpr_line(self, line): """Parse a line in a kpr file into where (e.g. 'pass') and problem name""" m = re.search("^([a-z]+)/.+ (.+)$", line) return (m.group(1), m.group(2)) def sort_by_path(self): self.failures.sort(key=lambda x: self.logdict[x.pkgspec]) def sort_by_bugged_and_rdeps(self, pkgsdb): self.pkgsdb = pkgsdb def keyfunc(x, pkgsdb=self.pkgsdb, logdict=self.logdict): rdeps = pkgsdb.rrdep_count(get_pkg(x.pkgspec)) is_failed = get_where(logdict[x.pkgspec]) == "fail" return (not is_failed, -rdeps, logdict[x.pkgspec]) self.failures.sort(key=keyfunc) def filtered(self, problem): return [x for x in self.failures if problem == x.problem] def make_failure(where, problem, pkgspec): return namedtuple("Failure", "where problem pkgspec")(where, problem, pkgspec) def get_where(logpath): """Convert a path to a log file to the 'where' component (e.g. 'pass')""" return logpath.split("/")[-2] def replace_ext(fpath, newext): basename = os.path.splitext(os.path.split(fpath)[1])[0] return "/".join(fpath.split("/")[:-1] + [basename + newext]) def get_pkg(pkgspec): return pkgspec.split("_")[0] def get_kpr_path(logpath): """Return the kpr file path for a particular log path""" return replace_ext(logpath, KPR_EXT) def get_file_dict(workdirs, ext): """For files in [workdirs] with extension 'ext', create a dict of _: """ return { os.path.splitext(os.path.basename(fl))[0]: os.path.join(d, fl) for d in workdirs for fl in os.listdir(d) if os.path.splitext(fl)[1] == ext } def create_problem_list(pdir): plist = [] pdir_list = os.listdir(pdir) pdir_list.sort() for pfile in [x for x in pdir_list if x.endswith(".conf")]: prob = Problem(os.path.join(pdir, pfile)) if prob.valid(): plist.append(prob) else: logging.error("Keyword error in %s - skipping" % pfile) return plist def clean_cache_files(logdict, cachedict, recheck=False, recheck_failed=False, skipnewer=False): """Delete files in cachedict if the corresponding logdict file is missing or newer""" count = 0 for pkgspec in cachedict: try: if ( pkgspec not in logdict or (os.path.getmtime(logdict[pkgspec]) > os.path.getmtime(cachedict[pkgspec]) and not skipnewer) or get_where(logdict[pkgspec]) != get_where(cachedict[pkgspec]) or recheck or (recheck_failed and get_where(cachedict[pkgspec]) not in ["pass"]) ): os.remove(cachedict[pkgspec]) count = count + 1 except (IOError, OSError): # logfile may have disappeared pass return count def make_kprs(logdict, kprdict, problem_list): """Create kpr files, as necessary, so every log file has one kpr entries are e.g. fail/xorg-docs_1:1.6-1.log broken_symlinks_error.conf""" needs_kpr = set(logdict.keys()).difference(set(kprdict.keys())) for pkg_spec in needs_kpr: logpath = logdict[pkg_spec] try: with open(logpath, "r", errors="backslashreplace") as lb: logbody = lb.read() where = get_where(logpath) kprs = [ "%s/%s.log %s\n" % (where, pkg_spec, problem.name) for problem in problem_list if problem.has_problem(logbody, where) ] kprs = "".join(kprs) if where != "pass" and not kprs: kprs = "%s/%s.log %s\n" % ( where, pkg_spec, "unclassified_failures.conf", ) with open(get_kpr_path(logpath), "w") as f: f.write(kprs) except IOError: logging.error("File error processing %s" % logpath) return len(needs_kpr) # vi:set et ts=4 sw=4 :