[gnome-ostree/wip/gjs-round3: 1/3] dyntask: New dynamic task executor
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-ostree/wip/gjs-round3: 1/3] dyntask: New dynamic task executor
- Date: Sun, 6 Jan 2013 17:48:14 +0000 (UTC)
commit 90a210679c294a743ae8bf367c64fd2150fc12ca
Author: Colin Walters <walters verbum org>
Date: Tue Dec 11 17:33:01 2012 -0500
dyntask: New dynamic task executor
Like "make", except everything is in gjs, and asynchronous.
Makefile-ostbuild.am | 1 +
src/ostbuild/js/dyntask.js | 314 ++++++++++++++++++++++++++++++++++++++++++++
src/ostbuild/js/jsondb.js | 1 +
src/ostbuild/js/main.js | 3 +-
4 files changed, 318 insertions(+), 1 deletions(-)
---
diff --git a/Makefile-ostbuild.am b/Makefile-ostbuild.am
index 972bfe3..bd663c2 100644
--- a/Makefile-ostbuild.am
+++ b/Makefile-ostbuild.am
@@ -43,6 +43,7 @@ jsostbuild_DATA= \
src/ostbuild/js/buildutil.js \
src/ostbuild/js/checkout.js \
src/ostbuild/js/config.js \
+ src/ostbuild/js/dyntask.js \
src/ostbuild/js/git_mirror.js \
src/ostbuild/js/jsondb.js \
src/ostbuild/js/jsonutil.js \
diff --git a/src/ostbuild/js/dyntask.js b/src/ostbuild/js/dyntask.js
new file mode 100644
index 0000000..fe269ff
--- /dev/null
+++ b/src/ostbuild/js/dyntask.js
@@ -0,0 +1,314 @@
+const GLib = imports.gi.GLib;
+const Gio = imports.gi.Gio;
+const format = imports.format;
+const Lang = imports.lang;
+
+const GSystem = imports.gi.GSystem;
+const Params = imports.params;
+
+const VERSION_RE = /(\d+)\.(\d+)/;
+
+const TaskMaster = new Lang.Class({
+ Name: 'TaskMaster',
+
+ _init: function(path, params) {
+ params = Params.parse(params, {maxConcurrent: 4,
+ onEmpty: null});
+ this.path = path;
+ this.maxConcurrent = params.maxConcurrent;
+ this._onEmpty = params.onEmpty;
+ this.cancellable = null;
+ this._idleRecalculateId = 0;
+ this._taskSerial = 1;
+ this._tasks = [];
+ this._executing = [];
+ this._pendingTasksList = [];
+ this._seenTasks = {};
+ this._completeTasks = {};
+ this._taskErrors = {};
+ },
+
+ register: function(taskdef) {
+ this._tasks.push(taskdef);
+ },
+
+ push: function(taskName) {
+ if (this._seenTasks[taskName])
+ return null;
+ let result = null;
+ for (let i = 0; i < this._tasks.length; i++) {
+ let taskDef = this._tasks[i];
+ let pattern = taskDef.getPattern();
+ let re = pattern[0];
+ let match = re.exec(taskName);
+ if (!match)
+ continue;
+
+ let serial = this._taskSerial;
+ this._taskSerial++;
+ let vars = {};
+ for (let i = 1; i < pattern.length; i++) {
+ vars[pattern[i]] = match[i];
+ }
+ let specifiedDependencies = taskDef.getDepends(vars);;
+ let waitingDependencies = {};
+ for (let j = 0; j < specifiedDependencies.length; j++) {
+ let depName = specifiedDependencies[j];
+ if (!this._completeTasks[depName]) {
+ let depTask = this.push(depName);
+ waitingDependencies[depName] = depTask;
+ }
+ }
+ result = {name: taskName,
+ def: taskDef,
+ vars: vars,
+ dependencies: specifiedDependencies,
+ waitingDependencies: waitingDependencies,
+ serial: serial,
+ result: null };
+ this._pendingTasksList.push(result);
+ this._seenTasks[taskName] = true;
+ break;
+ }
+ if (!result)
+ throw new Error("No task definition matches " + taskName);
+ this._queueRecalculate();
+ return result;
+ },
+
+ _queueRecalculate: function() {
+ if (this._idleRecalculateId > 0)
+ return;
+ this._idleRecalculateId = GLib.idle_add(GLib.PRIORITY_DEFAULT, Lang.bind(this, this._recalculate));
+ },
+
+ _visit: function(task, sorted, scanned) {
+ if (scanned[task.name])
+ return;
+ scanned[task.name] = true;
+ for (let depName in task.waitingDependencies) {
+ let dep = task.waitingDependencies[depName];
+ this._visit(dep, sorted, scanned);
+ }
+ sorted.push(task);
+ },
+
+ _recalculate: function() {
+ let sorted = [];
+ let scanned = {};
+
+ this._idleRecalculateId = 0;
+
+ if (this._executing.length == 0 &&
+ this._pendingTasksList.length == 0) {
+ this._onEmpty();
+ return;
+ } else if (this._pendingTasksList.length == 0) {
+ return;
+ }
+
+ for (let i = 0; i < this._pendingTasksList.length; i++) {
+ let task = this._pendingTasksList[i];
+ this._visit(task, sorted, scanned);
+ }
+
+ this._pendingTasksList = sorted;
+
+ this._reschedule();
+ },
+
+ _onComplete: function(result, error, task) {
+ if (error) {
+ log("TaskMaster: While executing " + task.name + ": " + error);
+ this._taskErrors[task.name] = error;
+ } else {
+ log("TaskMaster: Completed: " + task.name + " : " + JSON.stringify(result));
+ }
+ let idx = -1;
+ for (let i = 0; i < this._executing.length; i++) {
+ let executingTask = this._executing[i];
+ if (executingTask.serial != task.serial)
+ continue;
+ idx = i;
+ break;
+ }
+ if (idx == -1)
+ throw new Error("TaskMaster: Internal error - Failed to find completed task serial:" + task.serial);
+ task.result = result;
+ this._completeTasks[task.name] = task;
+ this._executing.splice(idx, 1);
+ for (let i = 0; i < this._pendingTasksList.length; i++) {
+ let pendingTask = this._pendingTasksList[i];
+ let deps = pendingTask.waitingDependencies;
+ if (deps[task.name]) {
+ log("Completed dep + " + task.name);
+ delete deps[task.name];
+ }
+ }
+ this._queueRecalculate();
+ },
+
+ _hasDeps: function(task) {
+ for (let depName in task.waitingDependencies) {
+ return true;
+ }
+ return false;
+ },
+
+ _reschedule: function() {
+ while (this._executing.length < this.maxConcurrent &&
+ this._pendingTasksList.length > 0 &&
+ !this._hasDeps(this._pendingTasksList[0])) {
+ let task = this._pendingTasksList.shift();
+ log("TaskMaster: running: " + task.name);
+ let depResults = [];
+ for (let i = 0; i < task.dependencies.length; i++) {
+ let depName = task.dependencies[i];
+ depResults.push(this._completeTasks[depName].result);
+ }
+ task.def.execute(task.vars, depResults, this.cancellable, Lang.bind(this, this._onComplete, task));
+ this._executing.push(task);
+ }
+ }
+});
+
+const TaskDef = new Lang.Class({
+ Name: 'TaskDef',
+
+ _init: function() {
+ },
+
+ getPattern: function() {
+ throw new Error("Not implemented");
+ },
+
+ getDepends: function(inputs) {
+ return [];
+ },
+
+ execute: function(inputs, dependResults, cancellable, onComplete) {
+ throw new Error("Not implemented");
+ }
+});
+
+const TaskChecksumSha256 = new Lang.Class({
+ Name: 'TaskChecksumSha256',
+ Extends: TaskDef,
+
+ _init: function() {
+ },
+
+ getPattern: function() {
+ return [/\/ChecksumSha256\/(.*)$/, 'PATH'];
+ },
+
+ _onAsyncOpComplete: function(error) {
+ let state = this;
+ state.asyncOutstanding--;
+ if (state.asyncOutstanding != 0)
+ return;
+ if (error) {
+ state.onComplete(null, error);
+ } else {
+ let csumStr = state.buf.steal_as_bytes().toArray().toString();
+ state.onComplete(csumStr.substr(0, csumStr.indexOf(' ')), null);
+ }
+ },
+
+ _onSpliceComplete: function(stream, result) {
+ let state = this;
+
+ let error = null;
+ try {
+ stream.splice_finish(result);
+ } catch (e) {
+ if (e.domain != undefined)
+ error = e;
+ else
+ throw e;
+ }
+ Lang.bind(state, state.me._onAsyncOpComplete)(error);
+ },
+
+ _onProcWait: function(proc, result) {
+ let state = this;
+
+ let error = null;
+ try {
+ let [success,ecode] = proc.wait_finish(result);
+ GLib.spawn_check_exit_status(ecode);
+ } catch (e) {
+ if (e.domain != undefined)
+ error = e;
+ else
+ throw e;
+ }
+ Lang.bind(state, state.me._onAsyncOpComplete)(error);
+ },
+
+ execute: function(inputs, dependResults, cancellable, onComplete) {
+ let state = {me: this,
+ onComplete: onComplete,
+ buf: null,
+ asyncOutstanding: 2};
+ let path = inputs.PATH;
+ let context = new GSystem.SubprocessContext({argv: ['sha256sum', path]});
+ context.set_stdout_disposition(GSystem.SubprocessStreamDisposition.PIPE);
+ let proc = new GSystem.Subprocess({context: context});
+ proc.init(cancellable);
+ let stdout = proc.get_stdout_pipe();
+ state.buf = Gio.MemoryOutputStream.new_resizable();
+ state.buf.splice_async(stdout, Gio.OutputStreamSpliceFlags.CLOSE_SOURCE |
+ Gio.OutputStreamSpliceFlags.CLOSE_TARGET, GLib.PRIORITY_DEFAULT,
+ cancellable, Lang.bind(state, this._onSpliceComplete));
+ proc.wait(cancellable, Lang.bind(state, this._onProcWait));
+ }
+});
+
+const TaskChecksumMany = new Lang.Class({
+ Name: 'TaskChecksumMany',
+ Extends: TaskDef,
+
+ _init: function() {
+ },
+
+ getPattern: function() {
+ return [/\/ChecksumMany\/(.*)$/, 'FILENAMES'];
+ },
+
+ getDepends: function(inputs) {
+ let filenamesStr = inputs.FILENAMES;
+ let filenames = filenamesStr.split(',');
+ let r = [];
+ for (let i = 0; i < filenames.length; i++)
+ r.push('/ChecksumSha256/' + filenames[i]);
+ return r;
+ },
+
+ execute: function(inputs, dependResults, cancellable, onComplete) {
+ let r = '';
+ for (let i = 0; i < dependResults.length; i++)
+ r += dependResults[i] + '\n';
+ GLib.idle_add(GLib.PRIORITY_DEFAULT, function() {
+ onComplete(r, null);
+ });
+ }
+});
+
+function main(argv) {
+ var loop = GLib.MainLoop.new(null, true);
+ let ecode = 1;
+ var app = new TaskMaster('taskmaster/', {onEmpty: function() {
+ log("TaskMaster: Complete!");
+ loop.quit();
+ }});
+ app.register(new TaskChecksumSha256());
+ app.register(new TaskChecksumMany());
+ for (let i = 0; i < argv.length; i++) {
+ let taskName = argv[i];
+ app.push(taskName);
+ };
+ loop.run();
+ ecode = 0;
+ return ecode;
+}
diff --git a/src/ostbuild/js/jsondb.js b/src/ostbuild/js/jsondb.js
index 856aa6e..eb605fe 100644
--- a/src/ostbuild/js/jsondb.js
+++ b/src/ostbuild/js/jsondb.js
@@ -4,6 +4,7 @@ const Lang = imports.lang;
const Format = imports.format;
const JsonUtil = imports.jsonutil;
+const GSystem = imports.gi.GSystem;
const JsonDB = new Lang.Class({
Name: 'JsonDB',
diff --git a/src/ostbuild/js/main.js b/src/ostbuild/js/main.js
index ee7ae01..600980a 100755
--- a/src/ostbuild/js/main.js
+++ b/src/ostbuild/js/main.js
@@ -22,7 +22,8 @@ const BUILTINS = {'autobuilder': "Run resolve and build",
'prefix': "Display or modify \"prefix\" (build target)",
'git-mirror': "Update internal git mirror for one or more components",
'resolve': "Expand git revisions in source to exact targets",
- 'build': "Build multiple components and generate trees"};
+ 'build': "Build multiple components and generate trees",
+ 'dyntask': "Execute tasks"};
function usage(ecode) {
print("Builtins:");
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]