[gnome-continuous-yocto/gnomeostree-3.28-rocko: 6133/8267] scripts/oe-selftest: Migrate to new framework into oeqa.selftest.context



commit 10c512b60d1167122b5fe778b93838dca3def717
Author: Leonardo Sandoval <leonardo sandoval gonzalez linux intel com>
Date:   Thu May 25 15:20:56 2017 -0500

    scripts/oe-selftest: Migrate to new framework into oeqa.selftest.context
    
    The new OEQA framework aims to re-use code into the different Test
    components.
    
    The previous oe-selftest implements it-self loading, run, and list test
    cases in a non-standard way (unittest base) and other functionalities
    like logging that is now on oeqa core. This ends on a compact oe-selftest
    script.
    
    All needed command line options was migrated but there are some of them
    pending of implementation and others deprecated.
    
    Deprecated options:
    
    list-tags: The tag functionality into the old oeqa framework isn't
        work, the selftest doesn't has tag decorators.
    {run, list}-tests-by: Ambiguos options it accepts all the posibilites module,
        class, name, id or tag.
    
    Remaining to implement:
    
    coverage: It enables covrage reports over a test run, currently isn't on
        on use and some bugs [1], i filed a bug to add support to OEQA core module in
        this way other Test components could enable it.
    repository: It push XML results into a git repository and isn't in use,
        i filed a bug to implement this into OEQA core module. [2]
    
    [1] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11582#c0
    [2] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11583#c0
    
    (From OE-Core rev: 3b2a20eee4a39f40287bf67545839eaa09fc892d)
    
    Signed-off-by: Leonardo Sandoval <leonardo sandoval gonzalez linux intel com>
    Signed-off-by: Aníbal Limón <anibal limon linux intel com>
    Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>

 meta/lib/oeqa/selftest/context.py |  224 +++++++++++
 scripts/oe-selftest               |  737 ++-----------------------------------
 2 files changed, 250 insertions(+), 711 deletions(-)
---
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
new file mode 100644
index 0000000..ca73070
--- /dev/null
+++ b/meta/lib/oeqa/selftest/context.py
@@ -0,0 +1,224 @@
+# Copyright (C) 2017 Intel Corporation
+# Released under the MIT license (see COPYING.MIT)
+
+import os
+import time
+import glob
+import sys
+import imp
+from random import choice
+
+import oeqa
+
+from oeqa.core.context import OETestContext, OETestContextExecutor
+from oeqa.core.exception import OEQAPreRun
+
+from oeqa.utils.commands import runCmd, get_bb_vars, get_test_layer
+
+class OESelftestTestContext(OETestContext):
+    def __init__(self, td=None, logger=None, machines=None, testlayer_path=None):
+        super(OESelftestTestContext, self).__init__(td, logger)
+
+        self.machines = machines
+        self.custommachine = None
+
+        self.testlayer_path = testlayer_path
+
+    def runTests(self, machine=None):
+        if machine:
+            self.custommachine = machine
+            if machine == 'random':
+                self.custommachine = choice(self.machines)
+            self.logger.info('Run tests with custom MACHINE set to: %s' % \
+                    self.custommachine)
+        return super(OESelftestTestContext, self).runTests()
+
+    def listTests(self, display_type, machine=None):
+        return super(OESelftestTestContext, self).listTests(display_type)
+
+class OESelftestTestContextExecutor(OETestContextExecutor):
+    _context_class = OESelftestTestContext
+    _script_executor = 'oe-selftest'
+
+    name = 'oe-selftest'
+    help = 'oe-selftest test component'
+    description = 'Executes selftest tests'
+
+    def register_commands(self, logger, parser):
+        group = parser.add_mutually_exclusive_group(required=True)
+
+        group.add_argument('-a', '--run-all-tests', default=False,
+                action="store_true", dest="run_all_tests",
+                help='Run all (unhidden) tests')
+        group.add_argument('-r', '--run-tests', required=False, action='store',
+                nargs='+', dest="run_tests", default=None,
+                help='Select what tests to run (modules, classes or test methods). Format should be: 
<module>.<class>.<test_method>')
+
+        group.add_argument('-m', '--list-modules', required=False,
+                action="store_true", default=False,
+                help='List all available test modules.')
+        group.add_argument('--list-classes', required=False,
+                action="store_true", default=False,
+                help='List all available test classes.')
+        group.add_argument('-l', '--list-tests', required=False,
+                action="store_true", default=False,
+                help='List all available tests.')
+
+        parser.add_argument('--machine', required=False, choices=['random', 'all'],
+                            help='Run tests on different machines (random/all).')
+        
+        parser.set_defaults(func=self.run)
+
+    def _get_available_machines(self):
+        machines = []
+
+        bbpath = self.tc_kwargs['init']['td']['BBPATH'].split(':')
+    
+        for path in bbpath:
+            found_machines = glob.glob(os.path.join(path, 'conf', 'machine', '*.conf'))
+            if found_machines:
+                for i in found_machines:
+                    # eg: '/home/<user>/poky/meta-intel/conf/machine/intel-core2-32.conf'
+                    machines.append(os.path.splitext(os.path.basename(i))[0])
+    
+        return machines
+
+    def _get_cases_paths(self, bbpath):
+        cases_paths = []
+        for layer in bbpath:
+            cases_dir = os.path.join(layer, 'lib', 'oeqa', 'selftest', 'cases')
+            if os.path.isdir(cases_dir):
+                cases_paths.append(cases_dir)
+        return cases_paths
+
+    def _process_args(self, logger, args):
+        args.output_log = '%s-results-%s.log' % (self.name,
+                time.strftime("%Y%m%d%H%M%S"))
+        args.test_data_file = None
+        args.CASES_PATHS = None
+
+        super(OESelftestTestContextExecutor, self)._process_args(logger, args)
+
+        if args.list_modules:
+            args.list_tests = 'module'
+        elif args.list_classes:
+            args.list_tests = 'class'
+        elif args.list_tests:
+            args.list_tests = 'name'
+
+        self.tc_kwargs['init']['td'] = get_bb_vars()
+        self.tc_kwargs['init']['machines'] = self._get_available_machines()
+        self.tc_kwargs['init']['testlayer_path'] = get_test_layer()
+
+    def _pre_run(self):
+        def _check_required_env_variables(vars):
+            for var in vars:
+                if not os.environ.get(var):
+                    self.tc.logger.error("%s is not set. Did you forget to source your build environment 
setup script?" % var)
+                    raise OEQAPreRun
+
+        def _check_presence_meta_selftest():
+            builddir = os.environ.get("BUILDDIR")
+            if os.getcwd() != builddir:
+                self.tc.logger.info("Changing cwd to %s" % builddir)
+                os.chdir(builddir)
+
+            if not "meta-selftest" in self.tc.td["BBLAYERS"]:
+                self.tc.logger.warn("meta-selftest layer not found in BBLAYERS, adding it")
+                meta_selftestdir = os.path.join(
+                    self.tc.td["BBLAYERS_FETCH_DIR"], 'meta-selftest')
+                if os.path.isdir(meta_selftestdir):
+                    runCmd("bitbake-layers add-layer %s" %meta_selftestdir)
+                    # reload data is needed because a meta-selftest layer was add
+                    self.tc.td = get_bb_vars()
+                else:
+                    self.tc.logger.error("could not locate meta-selftest in:\n%s" % meta_selftestdir)
+                    raise OEQAPreRun
+
+        def _add_layer_libs():
+            bbpath = self.tc.td['BBPATH'].split(':')
+            layer_libdirs = [p for p in (os.path.join(l, 'lib') \
+                    for l in bbpath) if os.path.exists(p)]
+            if layer_libdirs:
+                self.tc.logger.info("Adding layer libraries:")
+                for l in layer_libdirs:
+                    self.tc.logger.info("\t%s" % l)
+
+                sys.path.extend(layer_libdirs)
+                imp.reload(oeqa.selftest)
+
+        _check_required_env_variables(["BUILDDIR"])
+        _check_presence_meta_selftest()
+
+        if "buildhistory.bbclass" in self.tc.td["BBINCLUDED"]:
+            self.tc.logger.error("You have buildhistory enabled already and this isn't recommended for 
selftest, please disable it first.")
+            raise OEQAPreRun
+
+        if "PRSERV_HOST" in self.tc.td:
+            self.tc.logger.error("Please unset PRSERV_HOST in order to run oe-selftest")
+            raise OEQAPreRun
+
+        if "SANITY_TESTED_DISTROS" in self.tc.td:
+            self.tc.logger.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest")
+            raise OEQAPreRun
+
+        _add_layer_libs()
+
+        self.tc.logger.info("Running bitbake -p")
+        runCmd("bitbake -p")
+
+    def _internal_run(self, logger, args):
+        self.module_paths = self._get_cases_paths(
+                self.tc_kwargs['init']['td']['BBPATH'].split(':'))
+
+        self.tc = self._context_class(**self.tc_kwargs['init'])
+        self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
+
+        if args.list_tests:
+            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['run'])
+        else:
+            self._pre_run()
+            rc = self.tc.runTests(**self.tc_kwargs['run'])
+            rc.logSummary(self.name)
+            rc.logDetails()
+
+        return rc
+    
+    def run(self, logger, args):
+        self._process_args(logger, args)
+        rc = None
+
+        if args.machine:
+            logger.info('Custom machine mode enabled. MACHINE set to %s' %
+                    args.machine)
+
+            if args.machine == 'all':
+                results = []
+                for m in self.tc_kwargs['init']['machines']:
+                    self.tc_kwargs['run']['machine'] = m
+                    results.append(self._internal_run(logger, args))
+
+                    # XXX: the oe-selftest script only needs to know if one
+                    # machine run fails
+                    for r in results:
+                        rc = r
+                        if not r.wasSuccessful():
+                            break
+
+            else:
+                self.tc_kwargs['run']['machine'] = args.machine
+                return self._internal_run(logger, args)
+
+        else:
+            self.tc_kwargs['run']['machine'] = args.machine
+            rc = self._internal_run(logger, args)
+
+        output_link = os.path.join(os.path.dirname(args.output_log),
+                "%s-results.log" % self.name)
+        if os.path.exists(output_link):
+            os.remove(output_link)
+        os.symlink(args.output_log, output_link)
+
+        return rc
+
+_executor_class = OESelftestTestContextExecutor
diff --git a/scripts/oe-selftest b/scripts/oe-selftest
index 4909157..b200ace 100755
--- a/scripts/oe-selftest
+++ b/scripts/oe-selftest
@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 
-# Copyright (c) 2013 Intel Corporation
+# Copyright (c) 2013-2017 Intel Corporation
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License version 2 as
@@ -25,732 +25,47 @@
 # E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from 
meta/lib/oeqa/selftest/bblayers.py
 
 
+
 import os
 import sys
-import unittest
-import logging
 import argparse
-import subprocess
-import time as t
-import re
-import fnmatch
-import collections
-import imp
+import logging
 
-sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib')
+scripts_path = os.path.dirname(os.path.realpath(__file__))
+lib_path = scripts_path + '/lib'
+sys.path = sys.path + [lib_path]
+import argparse_oe
+import scriptutils
 import scriptpath
-scriptpath.add_bitbake_lib_path()
 scriptpath.add_oe_lib_path()
-import argparse_oe
-
-import oeqa.selftest
-import oeqa.utils.ftools as ftools
-from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer
-from oeqa.utils.metadata import metadata_from_bb, write_metadata_file
-from oeqa.selftest.base import oeSelfTest, get_available_machines
-
-try:
-    import xmlrunner
-    from xmlrunner.result import _XMLTestResult as TestResult
-    from xmlrunner import XMLTestRunner as _TestRunner
-except ImportError:
-    # use the base runner instead
-    from unittest import TextTestResult as TestResult
-    from unittest import TextTestRunner as _TestRunner
-
-log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S")
-
-def logger_create():
-    log_file = log_prefix + ".log"
-    if os.path.lexists("oe-selftest.log"):
-        os.remove("oe-selftest.log")
-    os.symlink(log_file, "oe-selftest.log")
-
-    log = logging.getLogger("selftest")
-    log.setLevel(logging.DEBUG)
-
-    fh = logging.FileHandler(filename=log_file, mode='w')
-    fh.setLevel(logging.DEBUG)
-
-    ch = logging.StreamHandler(sys.stdout)
-    ch.setLevel(logging.INFO)
-
-    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    fh.setFormatter(formatter)
-    ch.setFormatter(formatter)
-
-    log.addHandler(fh)
-    log.addHandler(ch)
+scriptpath.add_bitbake_lib_path()
 
-    return log
+from oeqa.utils import load_test_components
+from oeqa.core.exception import OEQAPreRun
 
-log = logger_create()
+logger = scriptutils.logger_create('oe-selftest')
 
-def get_args_parser():
+def main():
     description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to 
validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest 
for more information."
     parser = argparse_oe.ArgumentParser(description=description)
-    group = parser.add_mutually_exclusive_group(required=True)
-    group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", 
default=None, help='Select what tests to run (modules, classes or test methods). Format should be: 
<module>.<class>.<test_method>')
-    group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", 
default=False, help='Run all (unhidden) tests')
-    group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", 
default=False, help='List all available test modules.')
-    group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", 
default=False, help='List all available test classes.')
-    parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing")
-    parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the 
directories to take coverage from")
-    parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra 
patterns to include into the coverage measurement")
-    parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra 
patterns to exclude from the coverage measurement")
-    group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*',
-                       help='run-tests-by <name|class|module|id|tag> <list of 
tests|classes|modules|ids|tags>')
-    group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*',
-                       help='list-tests-by <name|class|module|id|tag> <list of 
tests|classes|modules|ids|tags>')
-    group.add_argument('-l', '--list-tests', required=False,  action="store_true", dest="list_tests", 
default=False,
-                       help='List all available tests.')
-    group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true",
-                       help='List all tags that have been set to test cases.')
-    parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None,
-                        help='Run tests on different machines (random/all).')
-    parser.add_argument('--repository', required=False, dest='repository', default='', action='store',
-                        help='Submit test results to a repository')
-    return parser
-
-builddir = None
-
-
-def preflight_check():
-
-    global builddir
-
-    log.info("Checking that everything is in order before running the tests")
-
-    if not os.environ.get("BUILDDIR"):
-        log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?")
-        return False
-
-    builddir = os.environ.get("BUILDDIR")
-    if os.getcwd() != builddir:
-        log.info("Changing cwd to %s" % builddir)
-        os.chdir(builddir)
-
-    if not "meta-selftest" in get_bb_var("BBLAYERS"):
-        log.warn("meta-selftest layer not found in BBLAYERS, adding it")
-        meta_selftestdir = os.path.join(
-                get_bb_var("BBLAYERS_FETCH_DIR"),
-                'meta-selftest')
-        if os.path.isdir(meta_selftestdir):
-            runCmd("bitbake-layers add-layer %s" %meta_selftestdir)
-        else:
-            log.error("could not locate meta-selftest in:\n%s"
-                    %meta_selftestdir)
-            return False
-
-    if "buildhistory.bbclass" in get_bb_var("BBINCLUDED"):
-        log.error("You have buildhistory enabled already and this isn't recommended for selftest, please 
disable it first.")
-        return False
-
-    if get_bb_var("PRSERV_HOST"):
-        log.error("Please unset PRSERV_HOST in order to run oe-selftest")
-        return False
-
-    if get_bb_var("SANITY_TESTED_DISTROS"):
-        log.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest")
-        return False
-
-    log.info("Running bitbake -p")
-    runCmd("bitbake -p")
-
-    return True
 
-def get_tests_modules(include_hidden=False):
-    modules_list = list()
-    for modules_path in oeqa.selftest.__path__:
-        for (p, d, f) in os.walk(modules_path):
-            files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not 
include_hidden) and not f.startswith('__') and f != 'base.py'])
-            for f in files:
-                submodules = p.split("selftest")[-1]
-                module = ""
-                if submodules:
-                    module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0]
-                else:
-                    module = 'oeqa.selftest.' + f.split('.py')[0]
-                if module not in modules_list:
-                    modules_list.append(module)
-    return modules_list
-
-
-def get_tests(exclusive_modules=[], include_hidden=False):
-    test_modules = list()
-    for x in exclusive_modules:
-        test_modules.append('oeqa.selftest.' + x)
-    if not test_modules:
-        inc_hidden = include_hidden
-        test_modules = get_tests_modules(inc_hidden)
-
-    return test_modules
-
-
-class Tc:
-    def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None):
-        self.tcname = tcname
-        self.tcclass = tcclass
-        self.tcmodule = tcmodule
-        self.tcid = tcid
-        # A test case can have multiple tags (as tuples) otherwise str will suffice
-        self.tctag = tctag
-        self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname])
-
-
-def get_tests_from_module(tmod):
-    tlist = []
-    prefix = 'oeqa.selftest.'
+    comp_name, comp = load_test_components(logger, 'oe-selftest').popitem()
+    comp.register_commands(logger, parser)
 
     try:
-        import importlib
-        modlib = importlib.import_module(tmod)
-        for mod in list(vars(modlib).values()):
-            if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest:
-                for test in dir(mod):
-                    if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'):
-                        # Get test case id and feature tag
-                        # NOTE: if testcase decorator or feature tag not set will throw error
-                        try:
-                            tid = vars(mod)[test].test_case
-                        except:
-                            print('DEBUG: tc id missing for ' + str(test))
-                            tid = None
-                        try:
-                            ttag = vars(mod)[test].tag__feature
-                        except:
-                            # print('DEBUG: feature tag missing for ' + str(test))
-                            ttag = None
-
-                        # NOTE: for some reason lstrip() doesn't work for mod.__module__
-                        tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag))
-    except:
-        pass
-
-    return tlist
-
-
-def get_all_tests():
-    # Get all the test modules (except the hidden ones)
-    testlist = []
-    tests_modules = get_tests_modules()
-    # Get all the tests from modules
-    for tmod in sorted(tests_modules):
-        testlist += get_tests_from_module(tmod)
-    return testlist
-
-
-def get_testsuite_by(criteria, keyword):
-    # Get a testsuite based on 'keyword'
-    # criteria: name, class, module, id, tag
-    # keyword: a list of tests, classes, modules, ids, tags
-
-    ts = []
-    all_tests = get_all_tests()
-
-    def get_matches(values):
-        # Get an item and return the ones that match with keyword(s)
-        # values: the list of items (names, modules, classes...)
-        result = []
-        remaining = values[:]
-        for key in keyword:
-            found = False
-            if key in remaining:
-                # Regular matching of exact item
-                result.append(key)
-                remaining.remove(key)
-                found = True
-            else:
-                # Wildcard matching
-                pattern = re.compile(fnmatch.translate(r"%s" % key))
-                added = [x for x in remaining if pattern.match(x)]
-                if added:
-                    result.extend(added)
-                    remaining = [x for x in remaining if x not in added]
-                    found = True
-            if not found:
-                log.error("Failed to find test: %s" % key)
-
-        return result
-
-    if criteria == 'name':
-        names = get_matches([ tc.tcname for tc in all_tests ])
-        ts = [ tc for tc in all_tests if tc.tcname in names ]
-
-    elif criteria == 'class':
-        classes = get_matches([ tc.tcclass for tc in all_tests ])
-        ts = [ tc for tc in all_tests if tc.tcclass in classes ]
-
-    elif criteria == 'module':
-        modules = get_matches([ tc.tcmodule for tc in all_tests ])
-        ts = [ tc for tc in all_tests if tc.tcmodule in modules ]
-
-    elif criteria == 'id':
-        ids = get_matches([ str(tc.tcid) for tc in all_tests ])
-        ts = [ tc for tc in all_tests if str(tc.tcid) in ids ]
-
-    elif criteria == 'tag':
-        values = set()
-        for tc in all_tests:
-            # tc can have multiple tags (as tuple) otherwise str will suffice
-            if isinstance(tc.tctag, tuple):
-                values |= { str(tag) for tag in tc.tctag }
-            else:
-                values.add(str(tc.tctag))
-
-        tags = get_matches(list(values))
-
-        for tc in all_tests:
-            for tag in tags:
-                if isinstance(tc.tctag, tuple) and tag in tc.tctag:
-                    ts.append(tc)
-                elif tag == tc.tctag:
-                    ts.append(tc)
-
-        # Remove duplicates from the list
-        ts = list(set(ts))
-
-    return ts
-
-
-def list_testsuite_by(criteria, keyword):
-    # Get a testsuite based on 'keyword'
-    # criteria: name, class, module, id, tag
-    # keyword: a list of tests, classes, modules, ids, tags
-    def tc_key(t):
-        if t[0] is None:
-            return  (0,) + t[1:]
-        return t
-    # tcid may be None if no ID was assigned, in which case sorted() will throw
-    # a TypeError as Python 3 does not allow comparison (<,<=,>=,>) of
-    # heterogeneous types, handle this by using a custom key generator
-    ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) \
-                  for tc in get_testsuite_by(criteria, keyword) ], key=tc_key)
-    print('_' * 150)
-    for t in ts:
-        if isinstance(t[1], (tuple, list)):
-            print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4]))
-        else:
-            print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t)
-    print('_' * 150)
-    print('Filtering by:\t %s' % criteria)
-    print('Looking for:\t %s' % ', '.join(str(x) for x in keyword))
-    print('Total found:\t %s' % len(ts))
-
-
-def list_tests():
-    # List all available oe-selftest tests
-
-    ts = get_all_tests()
-
-    print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test'))
-    print('_' * 80)
-    for t in ts:
-        if isinstance(t.tctag, (tuple, list)):
-            print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, 
t.tcname])))
-        else:
-            print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname])))
-    print('_' * 80)
-    print('Total found:\t %s' % len(ts))
-
-def list_tags():
-    # Get all tags set to test cases
-    # This is useful when setting tags to test cases
-    # The list of tags should be kept as minimal as possible
-    tags = set()
-    all_tests = get_all_tests()
-
-    for tc in all_tests:
-        if isinstance(tc.tctag, (tuple, list)):
-            tags.update(set(tc.tctag))
-        else:
-            tags.add(tc.tctag)
-
-    print('Tags:\t%s' % ', '.join(str(x) for x in tags))
-
-def coverage_setup(coverage_source, coverage_include, coverage_omit):
-    """ Set up the coverage measurement for the testcases to be run """
-    import datetime
-    import subprocess
-    global builddir
-    pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
-    curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", 
"HEAD"]).decode('utf-8')
-    coveragerc = "%s/.coveragerc" % builddir
-    data_file = "%s/.coverage." % builddir
-    data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
-    if os.path.isfile(data_file):
-        os.remove(data_file)
-    with open(coveragerc, 'w') as cps:
-        cps.write("# Generated with command '%s'\n" % " ".join(sys.argv))
-        cps.write("# HEAD commit %s\n" % curcommit.strip())
-        cps.write("[run]\n")
-        cps.write("data_file = %s\n" % data_file)
-        cps.write("branch = True\n")
-        # Measure just BBLAYERS, scripts and bitbake folders
-        cps.write("source = \n")
-        if coverage_source:
-            for directory in coverage_source:
-                if not os.path.isdir(directory):
-                    log.warn("Directory %s is not valid.", directory)
-                cps.write("    %s\n" % directory)
-        else:
-            for layer in get_bb_var('BBLAYERS').split():
-                cps.write("    %s\n" % layer)
-            cps.write("    %s\n" % os.path.dirname(os.path.realpath(__file__)))
-            cps.write("    %s\n" % 
os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake'))
-
-        if coverage_include:
-            cps.write("include = \n")
-            for pattern in coverage_include:
-                cps.write("    %s\n" % pattern)
-        if coverage_omit:
-            cps.write("omit = \n")
-            for pattern in coverage_omit:
-                cps.write("    %s\n" % pattern)
-
-        return coveragerc
-
-def coverage_report():
-    """ Loads the coverage data gathered and reports it back """
-    try:
-        # Coverage4 uses coverage.Coverage
-        from coverage import Coverage
-    except:
-        # Coverage under version 4 uses coverage.coverage
-        from coverage import coverage as Coverage
-
-    import io as StringIO
-    from coverage.misc import CoverageException
-
-    cov_output = StringIO.StringIO()
-    # Creating the coverage data with the setting from the configuration file
-    cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START'))
-    try:
-        # Load data from the data file specified in the configuration
-        cov.load()
-        # Store report data in a StringIO variable
-        cov.report(file = cov_output, show_missing=False)
-        log.info("\n%s" % cov_output.getvalue())
-    except CoverageException as e:
-        # Show problems with the reporting. Since Coverage4 not finding  any data to report raises an 
exception
-        log.warn("%s" % str(e))
-    finally:
-        cov_output.close()
-
-
-def main():
-    parser = get_args_parser()
-    args = parser.parse_args()
-
-    # Add <layer>/lib to sys.path, so layers can add selftests
-    log.info("Running bitbake -e to get BBPATH")
-    bbpath = get_bb_var('BBPATH').split(':')
-    layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
-    sys.path.extend(layer_libdirs)
-    imp.reload(oeqa.selftest)
-
-    # act like bitbake and enforce en_US.UTF-8 locale
-    os.environ["LC_ALL"] = "en_US.UTF-8"
-
-    if args.run_tests_by and len(args.run_tests_by) >= 2:
-        valid_options = ['name', 'class', 'module', 'id', 'tag']
-        if args.run_tests_by[0] not in valid_options:
-            print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % 
args.run_tests_by[0])
-            return 1
-        else:
-            criteria = args.run_tests_by[0]
-            keyword = args.run_tests_by[1:]
-            ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ])
-        if not ts:
-            return 1
-
-    if args.list_tests_by and len(args.list_tests_by) >= 2:
-        valid_options = ['name', 'class', 'module', 'id', 'tag']
-        if args.list_tests_by[0] not in valid_options:
-            print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % 
args.list_tests_by[0])
-            return 1
-        else:
-            criteria = args.list_tests_by[0]
-            keyword = args.list_tests_by[1:]
-            list_testsuite_by(criteria, keyword)
-
-    if args.list_tests:
-        list_tests()
-
-    if args.list_tags:
-        list_tags()
-
-    if args.list_allclasses:
-        args.list_modules = True
-
-    if args.list_modules:
-        log.info('Listing all available test modules:')
-        testslist = get_tests(include_hidden=True)
-        for test in testslist:
-            module = test.split('oeqa.selftest.')[-1]
-            info = ''
-            if module.startswith('_'):
-                info = ' (hidden)'
-            print(module + info)
-            if args.list_allclasses:
-                try:
-                    import importlib
-                    modlib = importlib.import_module(test)
-                    for v in vars(modlib):
-                        t = vars(modlib)[v]
-                        if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
-                            print(" --", v)
-                            for method in dir(t):
-                                if method.startswith("test_") and isinstance(vars(t)[method], 
collections.Callable):
-                                    print(" --  --", method)
-
-                except (AttributeError, ImportError) as e:
-                    print(e)
-                    pass
-
-    if args.run_tests or args.run_all_tests or args.run_tests_by:
-        if not preflight_check():
-            return 1
-
-        if args.run_tests_by:
-            testslist = ts
-        else:
-            testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False)
-
-        suite = unittest.TestSuite()
-        loader = unittest.TestLoader()
-        loader.sortTestMethodsUsing = None
-        runner = TestRunner(verbosity=2,
-                resultclass=buildResultClass(args))
-        # we need to do this here, otherwise just loading the tests
-        # will take 2 minutes (bitbake -e calls)
-        oeSelfTest.testlayer_path = get_test_layer()
-        for test in testslist:
-            log.info("Loading tests from: %s" % test)
-            try:
-                suite.addTests(loader.loadTestsFromName(test))
-            except AttributeError as e:
-                log.error("Failed to import %s" % test)
-                log.error(e)
-                return 1
-
-        if args.machine:
-            # Custom machine sets only weak default values (??=) for MACHINE in machine.inc
-            # This let test cases that require a specific MACHINE to be able to override it, using (?= or =)
-            log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine)
-            if args.machine == 'random':
-                os.environ['CUSTOMMACHINE'] = 'random'
-                result = runner.run(suite)
-            else:  # all
-                machines = get_available_machines()
-                for m in machines:
-                    log.info('Run tests with custom MACHINE set to: %s' % m)
-                    os.environ['CUSTOMMACHINE'] = m
-                    result = runner.run(suite)
-        else:
-            result = runner.run(suite)
-
-        log.info("Finished")
-
-        if args.repository:
-            import git
-            # Commit tests results to repository
-            metadata = metadata_from_bb()
-            git_dir = os.path.join(os.getcwd(), 'selftest')
-            if not os.path.isdir(git_dir):
-                os.mkdir(git_dir)
-
-            log.debug('Checking for git repository in %s' % git_dir)
-            try:
-                repo = git.Repo(git_dir)
-            except git.exc.InvalidGitRepositoryError:
-                log.debug("Couldn't find git repository %s; "
-                         "cloning from %s" % (git_dir, args.repository))
-                repo = git.Repo.clone_from(args.repository, git_dir)
-
-            r_branches = repo.git.branch(r=True)
-            r_branches = set(r_branches.replace('origin/', '').split())
-            l_branches = {str(branch) for branch in repo.branches}
-            branch = '%s/%s/%s' % (metadata['hostname'],
-                                   metadata['layers']['meta'].get('branch', '(nogit)'),
-                                   metadata['config']['MACHINE'])
-
-            if branch in l_branches:
-                log.debug('Found branch in local repository, checking out')
-                repo.git.checkout(branch)
-            elif branch in r_branches:
-                log.debug('Found branch in remote repository, checking'
-                          ' out and pulling')
-                repo.git.checkout(branch)
-                repo.git.pull()
-            else:
-                log.debug('New branch %s' % branch)
-                repo.git.checkout('master')
-                repo.git.checkout(b=branch)
-
-            cleanResultsDir(repo)
-            xml_dir = os.path.join(os.getcwd(), log_prefix)
-            copyResultFiles(xml_dir, git_dir, repo)
-            metadata_file = os.path.join(git_dir, 'metadata.xml')
-            write_metadata_file(metadata_file, metadata)
-            repo.index.add([metadata_file])
-            repo.index.write()
-
-            # Get information for commit message
-            layer_info = ''
-            for layer, values in metadata['layers'].items():
-                layer_info = '%s%-17s = %s:%s\n' % (layer_info, layer,
-                              values.get('branch', '(nogit)'), values.get('commit', '0'*40))
-            msg = 'Selftest for build %s of %s for machine %s on %s\n\n%s' % (
-                   log_prefix[12:], metadata['distro']['pretty_name'],
-                   metadata['config']['MACHINE'], metadata['hostname'], layer_info)
-
-            log.debug('Commiting results to local repository')
-            repo.index.commit(msg)
-            if not repo.is_dirty():
-                try:
-                    if branch in r_branches:
-                        log.debug('Pushing changes to remote repository')
-                        repo.git.push()
-                    else:
-                        log.debug('Pushing changes to remote repository '
-                                  'creating new branch')
-                        repo.git.push('-u', 'origin', branch)
-                except GitCommandError:
-                    log.error('Falied to push to remote repository')
-                    return 1
-            else:
-                log.error('Local repository is dirty, not pushing commits')
-
-        if result.wasSuccessful():
-            return 0
-        else:
-            return 1
-
-def buildResultClass(args):
-    """Build a Result Class to use in the testcase execution"""
-    import site
-
-    class StampedResult(TestResult):
-        """
-        Custom TestResult that prints the time when a test starts.  As oe-selftest
-        can take a long time (ie a few hours) to run, timestamps help us understand
-        what tests are taking a long time to execute.
-        If coverage is required, this class executes the coverage setup and reporting.
-        """
-        def startTest(self, test):
-            import time
-            self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ")
-            super(StampedResult, self).startTest(test)
-
-        def startTestRun(self):
-            """ Setup coverage before running any testcase """
-
-            # variable holding the coverage configuration file allowing subprocess to be measured
-            self.coveragepth = None
-
-            # indicates the system if coverage is currently installed
-            self.coverage_installed = True
-
-            if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
-                try:
-                    # check if user can do coverage
-                    import coverage
-                except:
-                    log.warn("python coverage is not installed. More info on 
https://pypi.python.org/pypi/coverage";)
-                    self.coverage_installed = False
-
-                if self.coverage_installed:
-                    log.info("Coverage is enabled")
-
-                    major_version = int(coverage.version.__version__[0])
-                    if major_version < 4:
-                        log.error("python coverage %s installed. Require version 4 or greater." % 
coverage.version.__version__)
-                        self.stop()
-                    # In case the user has not set the variable COVERAGE_PROCESS_START,
-                    # create a default one and export it. The COVERAGE_PROCESS_START
-                    # value indicates where the coverage configuration file resides
-                    # More info on https://pypi.python.org/pypi/coverage
-                    if not os.environ.get('COVERAGE_PROCESS_START'):
-                        os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, 
args.coverage_include, args.coverage_omit)
-
-                    # Use default site.USER_SITE and write corresponding config file
-                    site.ENABLE_USER_SITE = True
-                    if not os.path.exists(site.USER_SITE):
-                        os.makedirs(site.USER_SITE)
-                    self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth")
-                    with open(self.coveragepth, 'w') as cps:
-                        cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import 
coverage; coverage.process_startup();')
-
-        def stopTestRun(self):
-            """ Report coverage data after the testcases are run """
-
-            if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
-                if self.coverage_installed:
-                    with open(os.environ['COVERAGE_PROCESS_START']) as ccf:
-                        log.info("Coverage configuration file (%s)" % 
os.environ.get('COVERAGE_PROCESS_START'))
-                        log.info("===========================")
-                        log.info("\n%s" % "".join(ccf.readlines()))
-
-                    log.info("Coverage Report")
-                    log.info("===============")
-                    try:
-                        coverage_report()
-                    finally:
-                        # remove the pth file
-                        try:
-                            os.remove(self.coveragepth)
-                        except OSError:
-                            log.warn("Expected temporal file from coverage is missing, ignoring removal.")
-
-    return StampedResult
-
-def cleanResultsDir(repo):
-    """ Remove result files from directory """
-
-    xml_files = []
-    directory = repo.working_tree_dir
-    for f in os.listdir(directory):
-        path = os.path.join(directory, f)
-        if os.path.isfile(path) and path.endswith('.xml'):
-            xml_files.append(f)
-    repo.index.remove(xml_files, working_tree=True)
-
-def copyResultFiles(src, dst, repo):
-    """ Copy result files from src to dst removing the time stamp. """
-
-    import shutil
-
-    re_time = re.compile("-[0-9]+")
-    file_list = []
-
-    for root, subdirs, files in os.walk(src):
-        tmp_dir = root.replace(src, '').lstrip('/')
-        for s in subdirs:
-            os.mkdir(os.path.join(dst, tmp_dir, s))
-        for f in files:
-            file_name = os.path.join(dst, tmp_dir, re_time.sub("", f))
-            shutil.copy2(os.path.join(root, f), file_name)
-            file_list.append(file_name)
-    repo.index.add(file_list)
+        args = parser.parse_args()
+        results = args.func(logger, args)
+        ret = 0 if results.wasSuccessful() else 1
+    except SystemExit as err:
+        if err.code != 0:
+            raise err
+        ret = err.code
+    except OEQAPreRun as pr:
+        ret = 1
 
-class TestRunner(_TestRunner):
-    """Test runner class aware of exporting tests."""
-    def __init__(self, *args, **kwargs):
-        try:
-            exportdir = os.path.join(os.getcwd(), log_prefix)
-            kwargsx = dict(**kwargs)
-            # argument specific to XMLTestRunner, if adding a new runner then
-            # also add logic to use other runner's args.
-            kwargsx['output'] = exportdir
-            kwargsx['descriptions'] = False
-            # done for the case where telling the runner where to export
-            super(TestRunner, self).__init__(*args, **kwargsx)
-        except TypeError:
-            log.info("test runner init'ed like unittest")
-            super(TestRunner, self).__init__(*args, **kwargs)
+    return ret
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     try:
         ret = main()
     except Exception:



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]