[gxml] StreamReader: implement read_buffer_async()



commit a4038f05c0bbcbc5a7d7597e4511efdfe840fd4f
Author: Daniel Espinosa <esodan gmail com>
Date:   Tue Jul 30 14:04:09 2019 -0500

    StreamReader: implement read_buffer_async()
    
    Fix issue #27
    
    This method creates a new ThreadPool and
    execute parsing for each node in a new
    thread to read_from_string() the node using
    the read_buffer string cache.

 gxml/Element.vala                                  | 42 +++++++++++++++++--
 gxml/StreamReader.vala                             |  3 --
 ...reamReaderPerformanceAsyncReadUnparsedTest.vala | 49 ++++++++++------------
 ...amReaderPerformanceIterateReadUnparsedTest.vala | 14 +------
 test/StreamReaderTest.vala                         | 30 ++++++++++++-
 5 files changed, 91 insertions(+), 47 deletions(-)
---
diff --git a/gxml/Element.vala b/gxml/Element.vala
index 8782a9a..5e0aff2 100644
--- a/gxml/Element.vala
+++ b/gxml/Element.vala
@@ -809,20 +809,56 @@ public class GXml.Element : GXml.Node,
     if (read_buffer == null) {
       return;
     }
-    read_from_string ((string) read_buffer.data);
-    read_buffer = null;
     foreach (DomNode n in child_nodes) {
       if (n is GXml.Element) {
         ((GXml.Element) n).parse_buffer ();
       }
     }
+    read_from_string ((string) read_buffer.data);
+    read_buffer = null;
+  }
+
+  ThreadPool<Element> pool = null;
+
+  /**
+   * Monitor multi-threading parsing
+   */
+  public uint parse_pending () {
+    if (pool == null) {
+      return 0;
+    }
+    return pool.unprocessed ();
   }
 
   /**
    * Asynchronically parse {@link read_buffer}
    */
   public async void parse_buffer_async () throws GLib.Error {
-    parse_buffer ();
+    if (read_buffer == null) {
+      return;
+    }
+    uint nth = GLib.get_num_processors ();
+    if (nth > 1) {
+      nth = (uint) (nth - 1);
+    }
+    pool = new ThreadPool<Element>.with_owned_data ((e)=>{
+      try {
+        if (e.read_buffer != null) {
+          e.read_from_string ((string) e.read_buffer.data);
+          e.read_buffer = null;
+        }
+      } catch (GLib.Error err) {
+        warning (_("Error parsing child's buffer: %s"), err.message);
+      }
+    }, (int) nth, false);
+    foreach (DomNode n in child_nodes) {
+      if (n is GXml.Element) {
+        pool.add ((GXml.Element) n);
+      }
+    }
+    //while (lpool.unprocessed () != 0);
+    read_from_string ((string) read_buffer.data);
+    read_buffer = null;
   }
 }
 
diff --git a/gxml/StreamReader.vala b/gxml/StreamReader.vala
index c7283f2..127ffda 100644
--- a/gxml/StreamReader.vala
+++ b/gxml/StreamReader.vala
@@ -220,7 +220,6 @@ public class GXml.StreamReader : GLib.Object {
           }
         } else if (children && parent == null) {
           GXml.Element ce = read_element (false, e);;
-          message ("Parsing node: %s", ce.local_name);
           var col = root_collections.get (ce.local_name.down ());
           if (col != null) {
             var cobj = GLib.Object.new (col.items_type,
@@ -228,9 +227,7 @@ public class GXml.StreamReader : GLib.Object {
             cobj.read_buffer = ce.read_buffer;
             e.append_child (cobj);
             col.append (cobj);
-            message ("Added node: %s to %s", cobj.local_name, col.get_type ().name ());
           } else {
-            message ("Searching node property");
             foreach (ParamSpec pspec in
                 (e as GXml.Object).get_property_element_list ()) {
               if (pspec.value_type.is_a (typeof (Collection))) continue;
diff --git a/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala 
b/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
index c84e2ad..b06cf7a 100644
--- a/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
+++ b/test/StreamReaderPerformanceAsyncReadUnparsedTest.vala
@@ -28,43 +28,38 @@ class GXmlTest.Suite : GLib.Object
     GLib.Intl.setlocale (GLib.LocaleCategory.ALL, "");
     Test.init (ref args);
     Test.add_func ("/gxml/stream-reader/performance", () => {
+      File dir = File.new_for_path (GXmlTestConfig.TEST_DIR);
+      assert (dir.query_exists ());
+      File f = File.new_for_uri (dir.get_uri ()+"/test-large.xml");
+      assert (f.query_exists ());
       var loop = new MainLoop (null);
-      var timer = new Timer ();
-      ulong time = 0;
       try {
-        File dir = File.new_for_path (GXmlTestConfig.TEST_DIR);
-        assert (dir.query_exists ());
-        File f = File.new_for_uri (dir.get_uri ()+"/test-large.xml");
-        assert (f.query_exists ());
         var sr = new GXml.StreamReader (f.read ());
+        Test.timer_start ();
         var d = sr.read ();
-        timer.elapsed (out time);
-        message ("Initial Parse: %lu ms for %d nodes", time / 1000, d.document_element.child_nodes.length);
-        Timeout.add_full (0, 10, ()=>{
-          int l = d.document_element.child_nodes.item (5000).child_nodes.length;
-          if (l == 0) {
+        message ("Initial Parse: %g sec for %d nodes", Test.timer_elapsed (), 
d.document_element.child_nodes.length);
+        Test.timer_start ();
+        Idle.add (()=>{
+          (d.document_element as GXml.Element).parse_buffer_async.begin ((obj, res)=>{
+            try {
+              (d.document_element as GXml.Element).parse_buffer_async.end (res);
+            } catch (GLib.Error e) {
+              warning ("Error: %s", e.message);
+            }
+          });
+          if (d.document_element.child_nodes.item (10079).child_nodes.length == 0) {
             return Source.CONTINUE;
           }
-          try {
-            message ((d.document_element.child_nodes.item (5000) as DomElement).write_string ());
-          } catch (GLib.Error e) {
-            warning ("Error: %s", e.message);
+          if ((d.document_element as GXml.Element).parse_pending () != 0) {
+            return Source.CONTINUE;
           }
+          message ("Pending to parse: %u", (d.document_element as GXml.Element).parse_pending ());
+          message ("Parsed buffers: %g sec", Test.timer_elapsed ());
+          assert (d.document_element.child_nodes.item (10079) is GXml.Element);
+          assert (d.document_element.child_nodes.item (10079).child_nodes.length != 0);
           loop.quit ();
           return Source.REMOVE;
         });
-        Idle.add (()=>{
-            (d.document_element as GXml.Element).parse_buffer_async.begin ((obj, res)=>{
-              try {
-                (d.document_element as GXml.Element).parse_buffer_async.end (res);
-              } catch (GLib.Error e) {
-                warning ("Error: %s", e.message);
-              }
-              timer.elapsed (out time);
-              message ("Parse root: %lu ms", time / 1000);
-            });
-          return Source.REMOVE;
-        });
       } catch (GLib.Error e) {
         warning ("Error: %s", e.message);
       }
diff --git a/test/StreamReaderPerformanceIterateReadUnparsedTest.vala 
b/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
index 299afb2..1613991 100644
--- a/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
+++ b/test/StreamReaderPerformanceIterateReadUnparsedTest.vala
@@ -40,24 +40,12 @@ class GXmlTest.Suite : GLib.Object
         var d = sr.read ();
         timer.elapsed (out time);
         message ("Initial Parse: %lu ms for %d nodes", time / 1000, d.document_element.child_nodes.length);
-        Timeout.add_full (0, 10, ()=>{
-          int l = d.document_element.child_nodes.item (5000).child_nodes.length;
-          if (l == 0) {
-            return Source.CONTINUE;
-          }
-          try {
-            message ((d.document_element.child_nodes.item (5000) as DomElement).write_string ());
-          } catch (GLib.Error e) {
-            warning ("Error: %s", e.message);
-          }
-          loop.quit ();
-          return Source.REMOVE;
-        });
         Idle.add (()=>{
           try {
             (d.document_element as GXml.Element).parse_buffer ();
             timer.elapsed (out time);
             message ("Parse root: %lu ms", time / 1000);
+            loop.quit ();
           } catch (GLib.Error e) {
             warning ("Error: %s", e.message);
             assert_not_reached ();
diff --git a/test/StreamReaderTest.vala b/test/StreamReaderTest.vala
index 1e5c716..cdd9513 100644
--- a/test/StreamReaderTest.vala
+++ b/test/StreamReaderTest.vala
@@ -205,7 +205,7 @@ class GXmlTest {
                                warning ("Error: %s", e.message);
                        }
                });
-               Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed", () => {
+               Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed/sync", () => {
       var loop = new GLib.MainLoop (null);
       Idle.add (()=>{
                                string str = """<root p1="a" p2="b" ><child k="p" y="9"><code/><code 
u="3">TestC</code><Tek/><Tex y="456"/></child></root>""";
@@ -222,6 +222,34 @@ class GXmlTest {
                                }
                                return Source.REMOVE;
       });
+      loop.run ();
+               });
+               Test.add_func ("/gxml/stream-reader/child-multiple/read-unparsed/async", () => {
+      var loop = new GLib.MainLoop (null);
+      Idle.add (()=>{
+                               string str = """<root p1="a" p2="b" ><child k="p" y="9"><code/><code 
u="3">TestC</code><Tek/><Tex y="456"/></child></root>""";
+                               var istream = new MemoryInputStream.from_data (str.data, null);
+                               var sr = new StreamReader (istream);
+                               try {
+                                       var doc = sr.read ();
+                                       (doc.document_element as GXml.Element).parse_buffer_async.begin 
((obj, res)=>{
+                                               try {
+                                                       (doc.document_element as 
GXml.Element).parse_buffer_async.end (res);
+                                                       message (doc.write_string ());
+                                                       assert ((doc.document_element as 
GXml.Element).read_buffer == null);
+                                               } catch (GLib.Error e) {
+                                                       warning ("Error while reading stream: %s", e.message);
+                                               }
+                                       });
+                                       if ((doc.document_element as GXml.Element).parse_pending () != 0) {
+                                               return Source.CONTINUE;
+                                       }
+                                       loop.quit ();
+                               } catch (GLib.Error e) {
+                                       warning ("Error while reading stream: %s", e.message);
+                               }
+                               return Source.REMOVE;
+      });
       loop.run ();
                });
                Test.add_func ("/gxml/stream-reader/serialization", () => {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]