zephyr/scripts/ci/check-compliance.py
Anas Nashif ccb57d766b ci: fix skipped testcase status posting
Skipped tests were not evaluated correctly.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
2018-11-20 18:14:19 -05:00

394 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import subprocess
import re
import os
from email.utils import parseaddr
import sh
import logging
import argparse
from junitparser import TestCase, TestSuite, JUnitXml, Skipped, Error, Failure, Attr
from github import Github
from shutil import copyfile, copytree
import json
from pprint import pprint
if "ZEPHYR_BASE" not in os.environ:
logging.warn("$ZEPHYR_BASE environment variable undefined.\n")
repository_path = os.getcwd()
else:
repository_path = os.environ['ZEPHYR_BASE']
logger = None
DOCS_WARNING_FILE = "doc.warnings"
sh_special_args = {
'_tty_out': False,
'_cwd': repository_path
}
# Put the Kconfiglib path first to make sure no local Kconfiglib version is
# used
sys.path.insert(0, os.path.join(repository_path, "scripts/kconfig"))
import kconfiglib
def get_shas(refspec):
sha_list = sh.git("rev-list",
'--max-count={0}'.format(-1 if "." in refspec else 1),
refspec, **sh_special_args).split()
return sha_list
class MyCase(TestCase):
classname = Attr()
doc = Attr()
class ComplianceTest:
_name = ""
_title = ""
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html"
def __init__(self, suite, range):
self.case = None
self.suite = suite
self.commit_range = range
def prepare(self):
self.case = MyCase(self._name)
self.case.classname = "Guidelines"
print("Running {} tests...".format(self._name))
def run(self):
pass
class CheckPatch(ComplianceTest):
_name = "checkpatch"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#coding-style"
def run(self):
self.prepare()
diff = subprocess.Popen(('git', 'diff', '%s' %(self.commit_range)), stdout=subprocess.PIPE)
try:
output = subprocess.check_output(('%s/scripts/checkpatch.pl' %repository_path,
'--mailback', '--no-tree', '-'), stdin=diff.stdout,
stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as ex:
m = re.search("([1-9][0-9]*) errors,", ex.output.decode('utf8'))
if m:
self.case.result = Failure("Checkpatch issues", "failure")
self.case.result._elem.text = (ex.output.decode('utf8'))
class KconfigCheck(ComplianceTest):
_name = "Kconfig"
_doc = "https://docs.zephyrproject.org/latest/application/kconfig-tips.html"
def run(self):
self.prepare()
# Look up Kconfig files relative to ZEPHYR_BASE
os.environ["srctree"] = repository_path
# Parse the entire Kconfig tree, to make sure we see all symbols
os.environ["SOC_DIR"] = "soc/"
os.environ["BOARD_DIR"] = "boards/*/*"
os.environ["ARCH"] = "*"
# Enable strict Kconfig mode in Kconfiglib, which assumes there's just a
# single Kconfig tree and warns for all references to undefined symbols
os.environ["KCONFIG_STRICT"] = "y"
undef_ref_warnings = []
for warning in kconfiglib.Kconfig().warnings:
if "undefined symbol" in warning:
undef_ref_warnings.append(warning)
# Generating multiple JUnit <failure>s would be neater, but Shippable only
# seems to display the first one
if undef_ref_warnings:
self.case.result = Failure("undefined Kconfig symbols", "failure")
self.case.result._elem.text = "\n\n\n".join(undef_ref_warnings)
class Documentation(ComplianceTest):
_name = "Documentation"
_doc = "https://docs.zephyrproject.org/latest/contribute/doc-guidelines.html"
def run(self):
self.prepare()
if os.path.exists(DOCS_WARNING_FILE) and os.path.getsize(DOCS_WARNING_FILE) > 0:
with open(DOCS_WARNING_FILE, "rb") as f:
log = f.read()
self.case.result = Error("Documentation Issues", "failure")
self.case.result._elem.text = log.decode('utf8')
class GitLint(ComplianceTest):
_name = "Gitlint"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines"
def run(self):
self.prepare()
proc = subprocess.Popen('gitlint --commits %s' % (self.commit_range),
shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
msg = ""
if proc.wait() != 0:
msg = proc.stdout.read()
if msg != "":
text = (msg.decode('utf8'))
self.case.result = Failure("commit message syntax issues", "failure")
self.case.result._elem.text = text
class License(ComplianceTest):
_name = "License"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#licensing"
def run(self):
self.prepare()
scancode = "/opt/scancode-toolkit/scancode"
if not os.path.exists(scancode):
self.case.result = Skipped("scancode-toolkit not installed", "skipped")
return
os.makedirs("scancode-files", exist_ok=True)
new_files = sh.git("diff", "--name-only", "--diff-filter=A", self.commit_range, **sh_special_args)
if len(new_files) == 0:
return
for newf in new_files:
f = str(newf).rstrip()
os.makedirs(os.path.join('scancode-files', os.path.dirname(f)), exist_ok=True)
copy = os.path.join("scancode-files", f)
copyfile(f, copy)
try:
cmd = [scancode, '--verbose', '--copyright', '--license', '--license-diag', '--info',
'--classify', '--summary', '--json', 'scancode.json', 'scancode-files/']
cmd_str = " ".join(cmd)
logging.info(cmd_str)
out = subprocess.check_output(cmd_str, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
logging.error(e.output)
self.case.result = Skipped("Exception when running scancode", "skipped")
return
report = ""
with open ('scancode.json', 'r') as json_fp:
scancode_results = json.load(json_fp)
for file in scancode_results['files']:
if file['type'] == 'directory':
continue
original_fp = str(file['path']).replace('scancode-files/', '')
licenses = file['licenses']
if (file['is_script'] or file['is_source']) and (file['programming_language'] not in ['CMake']) and (file['extension'] not in ['.yaml']):
if len(file['licenses']) == 0:
report += ("* {} missing license.\n".format(original_fp))
else:
for l in licenses:
if l['key'] != "apache-2.0":
report += ("* {} is not apache-2.0 licensed: {}\n".format(original_fp, l['key']))
if l['category'] != 'Permissive':
report += ("* {} has non-permissive license: {}\n".format(original_fp, l['key']))
if len(file['copyrights']) == 0:
report += ("* {} missing copyright.\n".format(original_fp))
if report != "":
self.case.result = Failure("License/Copyright issues", "failure")
self.case.result._elem.text = report
class Identity(ComplianceTest):
_name = "Identity/Emails"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines"
def run(self):
self.prepare()
for f in get_shas(self.commit_range):
commit = sh.git("log", "--decorate=short", "-n 1", f, **sh_special_args)
signed = []
author = ""
sha = ""
parsed_addr = None
for line in commit.split("\n"):
match = re.search("^commit\s([^\s]*)", line)
if match:
sha = match.group(1)
match = re.search("^Author:\s(.*)", line)
if match:
author = match.group(1)
parsed_addr = parseaddr(author)
match = re.search("signed-off-by:\s(.*)", line, re.IGNORECASE)
if match:
signed.append(match.group(1))
error1 = "%s: author email (%s) needs to match one of the signed-off-by entries." % (sha, author)
error2 = "%s: author email (%s) does not follow the syntax: First Last <email>." % (sha, author)
failure = None
if author not in signed:
failure = error1
if not parsed_addr or len(parsed_addr[0].split(" ")) < 2:
if not failure:
failure = error2
else:
failure = failure + "\n" + error2
if failure:
self.case.result = Failure("identity/email issues", "failure")
self.case.result._elem.text = failure
def init_logs():
global logger
log_lev = os.environ.get('LOG_LEVEL', None)
level = logging.INFO
if log_lev == "DEBUG":
level = logging.DEBUG
elif log_lev == "ERROR":
level = logging.ERROR
console = logging.StreamHandler()
format = logging.Formatter('%(levelname)-8s: %(message)s')
console.setFormatter(format)
logger = logging.getLogger('')
logger.addHandler(console)
logger.setLevel(level)
logging.debug("Log init completed")
def parse_args():
parser = argparse.ArgumentParser(
description="Check for coding style and documentation warnings.")
parser.add_argument('-c', '--commits', default=None,
help="Commit range in the form: a..b")
parser.add_argument('-g', '--github', action="store_true",
help="Send results to github as a comment.")
parser.add_argument('-r', '--repo', default=None,
help="Github repository")
parser.add_argument('-p', '--pull-request', default=0, type=int,
help="Pull request number")
parser.add_argument('-s', '--status', action="store_true", help="Set status to pending")
parser.add_argument('-S', '--sha', default=None, help="Commit SHA")
return parser.parse_args()
def set_status(gh, repo, sha):
repo = gh.get_repo(repo)
commit = repo.get_commit(sha)
for Test in ComplianceTest.__subclasses__():
t = Test(None, "")
print("Creating status for %s" %(t._name))
commit.create_status('pending',
'%s' %t._doc,
'Verification in progress',
'{}'.format(t._name))
def main():
args = parse_args()
github_token = ''
gh = None
if args.github:
github_token = os.environ['GH_TOKEN']
gh = Github(github_token)
if args.status and args.sha != None and args.repo and gh:
set_status(gh, args.repo, args.sha)
sys.exit(0)
if not args.commits:
sys.exit(1)
suite = TestSuite("Compliance")
docs = {}
for Test in ComplianceTest.__subclasses__():
t = Test(suite, args.commits)
t.run()
suite.add_testcase(t.case)
docs[t.case.name] = t._doc
xml = JUnitXml()
xml.add_testsuite(suite)
xml.update_statistics()
xml.write('compliance.xml')
if args.github:
repo = gh.get_repo(args.repo)
pr = repo.get_pull(int(args.pull_request))
commit = repo.get_commit(args.sha)
comment = "Found the following issues, please fix and resubmit:\n\n"
comment_count = 0
print("Processing results...")
for case in suite:
if case.result and case.result.type != 'skipped':
comment_count += 1
comment += ("## {}\n".format(case.result.message))
comment += "\n"
if case.name not in ['Gitlint', 'Identity/Emails', 'License']:
comment += "```\n"
comment += ("{}\n".format(case.result._elem.text))
if case.name not in ['Gitlint', 'Identity/Emails', 'License']:
comment += "```\n"
commit.create_status('failure',
docs[case.name],
'Verification failed',
'{}'.format(case.name))
else:
commit.create_status('success',
docs[case.name],
'Verifications passed',
'{}'.format(case.name))
if args.repo and args.pull_request and comment_count > 0:
comments = pr.get_issue_comments()
commented = False
for c in comments:
if 'Found the following issues, please fix and resubmit' in c.body:
c.edit(comment)
commented = True
break
if not commented:
pr.create_issue_comment(comment)
if __name__ == "__main__":
main()