[gnome-continuous-yocto/gnomeostree-3.28-rocko: 1939/8267] bitbake: runqueue: Abstract worker functionality to an object/array



commit 0ef16f083eddb0eccd5fd1604e6e922a38705ae5
Author: Richard Purdie <richard purdie linuxfoundation org>
Date:   Mon Aug 15 17:58:39 2016 +0100

    bitbake: runqueue: Abstract worker functionality to an object/array
    
    With the introduction of multi-config and the possibility of distributed
    builds we need arrays of workers rather than the existing two.
    
    This refactors the code to have a dict() of workers and a dict of
    fakeworkers, represented by objects. The code can iterate over these.
    
    This is separated out from the multi-config changes since its separable
    and clearer this way.
    
    (Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16)
    
    Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>

 bitbake/lib/bb/runqueue.py |  122 ++++++++++++++++++++++++--------------------
 1 files changed, 66 insertions(+), 56 deletions(-)
---
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 3a593b6..6a953b8 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -922,6 +922,11 @@ class RunQueueData:
                          self.runtaskentries[tid].depends,
                          self.runtaskentries[tid].revdeps)
 
+class RunQueueWorker():
+    def __init__(self, process, pipe):
+        self.process = process
+        self.pipe = pipe
+
 class RunQueue:
     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
 
@@ -940,10 +945,8 @@ class RunQueue:
         self.dm = monitordisk.diskMonitor(cfgData)
 
         self.rqexe = None
-        self.worker = None
-        self.workerpipe = None
-        self.fakeworker = None
-        self.fakeworkerpipe = None
+        self.worker = {}
+        self.fakeworker = {}
 
     def _start_worker(self, fakeroot = False, rqexec = None):
         logger.debug(1, "Starting bitbake-worker")
@@ -988,55 +991,56 @@ class RunQueue:
         worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
         worker.stdin.flush()
 
-        return worker, workerpipe
+        return RunQueueWorker(worker, workerpipe)
 
-    def _teardown_worker(self, worker, workerpipe):
+    def _teardown_worker(self, worker):
         if not worker:
             return
         logger.debug(1, "Teardown for bitbake-worker")
         try:
-           worker.stdin.write(b"<quit></quit>")
-           worker.stdin.flush()
-           worker.stdin.close()
+           worker.process.stdin.write(b"<quit></quit>")
+           worker.process.stdin.flush()
+           worker.process.stdin.close()
         except IOError:
            pass
-        while worker.returncode is None:
-            workerpipe.read()
-            worker.poll()
-        while workerpipe.read():
+        while worker.process.returncode is None:
+            worker.pipe.read()
+            worker.process.poll()
+        while worker.pipe.read():
             continue
-        workerpipe.close()
+        worker.pipe.close()
 
     def start_worker(self):
         if self.worker:
             self.teardown_workers()
         self.teardown = False
-        self.worker, self.workerpipe = self._start_worker()
+        self.worker[''] = self._start_worker()
 
     def start_fakeworker(self, rqexec):
         if not self.fakeworker:
-            self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
+            self.fakeworker[''] = self._start_worker(True, rqexec)
 
     def teardown_workers(self):
         self.teardown = True
-        self._teardown_worker(self.worker, self.workerpipe)
-        self.worker = None
-        self.workerpipe = None
-        self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
-        self.fakeworker = None
-        self.fakeworkerpipe = None
+        for mc in self.worker:
+            self._teardown_worker(self.worker[mc])
+        self.worker = {}
+        for mc in self.fakeworker:
+            self._teardown_worker(self.fakeworker[mc])
+        self.fakeworker = {}
 
     def read_workers(self):
-        self.workerpipe.read()
-        if self.fakeworkerpipe:
-            self.fakeworkerpipe.read()
+        for mc in self.worker:
+            self.worker[mc].pipe.read()
+        for mc in self.fakeworker:
+            self.fakeworker[mc].pipe.read()
 
     def active_fds(self):
         fds = []
-        if self.workerpipe:
-            fds.append(self.workerpipe.input)
-        if self.fakeworkerpipe:
-            fds.append(self.fakeworkerpipe.input)
+        for mc in self.worker:
+            fds.append(self.worker[mc].pipe.input)
+        for mc in self.fakeworker:
+            fds.append(self.fakeworker[mc].pipe.input)
         return fds
 
     def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
@@ -1393,9 +1397,10 @@ class RunQueueExecute:
 
         self.stampcache = {}
 
-        rq.workerpipe.setrunqueueexec(self)
-        if rq.fakeworkerpipe:
-            rq.fakeworkerpipe.setrunqueueexec(self)
+        for mc in rq.worker:
+            rq.worker[mc].pipe.setrunqueueexec(self)
+        for mc in rq.fakeworker:
+            rq.fakeworker[mc].pipe.setrunqueueexec(self)
 
         if self.number_tasks <= 0:
              bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
@@ -1414,15 +1419,21 @@ class RunQueueExecute:
         return True
 
     def finish_now(self):
-        for worker in [self.rq.worker, self.rq.fakeworker]:
-            if not worker:
-                continue
+        for mc in self.rq.worker:
+            try:
+                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                self.rq.worker[mc].process.stdin.flush()
+            except IOError:
+                # worker must have died?
+                pass
+        for mc in self.rq.fakeworker:
             try:
-                worker.stdin.write(b"<finishnow></finishnow>")
-                worker.stdin.flush()
+                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                self.rq.fakeworker[mc].process.stdin.flush()
             except IOError:
                 # worker must have died?
                 pass
+
         if len(self.failed_fns) != 0:
             self.rq.state = runQueueFailed
             return
@@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
                         logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
                         self.rq.state = runQueueFailed
                         return True
-                self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, 
self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
-                self.rq.fakeworker.stdin.flush()
+                self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, 
False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
+                self.rq.fakeworker[''].process.stdin.flush()
             else:
-                self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, 
self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
-                self.rq.worker.stdin.flush()
+                self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, 
False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
+                self.rq.worker[''].process.stdin.flush()
 
             self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
             self.build_stamps2.append(self.build_stamps[task])
@@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
             if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not 
self.cooker.configuration.dry_run:
                 if not self.rq.fakeworker:
                     self.rq.start_fakeworker(self)
-                self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, 
self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
-                self.rq.fakeworker.stdin.flush()
+                self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, 
True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
+                self.rq.fakeworker[''].process.stdin.flush()
             else:
-                self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, 
self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
-                self.rq.worker.stdin.flush()
+                self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, 
True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
+                self.rq.worker[''].process.stdin.flush()
 
             self.runq_running.add(task)
             self.stats.taskActive()
@@ -2301,17 +2312,16 @@ class runQueuePipe():
 
     def read(self):
         for w in [self.rq.worker, self.rq.fakeworker]:
-            if not w:
-                continue
-            w.poll()
-            if w.returncode is not None and not self.rq.teardown:
-                name = None
-                if self.rq.worker and w.pid == self.rq.worker.pid:
-                    name = "Worker"
-                elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
-                    name = "Fakeroot"
-                bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, 
str(w.returncode)))
-                self.rq.finish_runqueue(True)
+            for mc in w:
+                w[mc].process.poll()
+                if w[mc].process.returncode is not None and not self.rq.teardown:
+                    name = None
+                    if w in self.rq.worker:
+                        name = "Worker"
+                    elif w in self.rq.fakeworker:
+                        name = "Fakeroot"
+                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, 
str(w.returncode)))
+                    self.rq.finish_runqueue(True)
 
         start = len(self.queue)
         try:


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]