[beast: 4/6] BSE: start 3 additional DSP slave threads to process modules concurrently



commit 315b5294bad948991b99f6de032af07dd8a78578
Author: Tim Janik <timj gnu org>
Date:   Sun Nov 6 23:17:22 2016 +0100

    BSE: start 3 additional DSP slave threads to process modules concurrently
    
    Signed-off-by: Tim Janik <timj gnu org>

 bse/bseenginemaster.cc |  139 ++++++++++++++++++++++++++++++++++++------------
 bse/bseenginemaster.hh |   10 ++++
 2 files changed, 115 insertions(+), 34 deletions(-)
---
diff --git a/bse/bseenginemaster.cc b/bse/bseenginemaster.cc
index 9f1e1eb..2ca4a85 100644
--- a/bse/bseenginemaster.cc
+++ b/bse/bseenginemaster.cc
@@ -859,7 +859,97 @@ master_process_locked_node (EngineNode *node,
     }
 }
 
-static gboolean gsl_profile_modules = 0;       /* set to 1 in gdb to get profile output */
+static bool bse_profile_modules = 0;   /* set to 1 in gdb to get profile output */
+
+struct ProfileData {
+  uint64 profile_maxtime;
+  EngineNode *profile_node;
+};
+
+static void
+thread_process_nodes (const uint n_values, ProfileData *profile)
+{
+  EngineNode *node = _engine_pop_unprocessed_node ();
+  while (node)
+    {
+      ToyprofStamp profile_stamp1;
+
+      if (UNLIKELY (profile))
+        toyprof_stamp (profile_stamp1);
+
+      master_process_locked_node (node, n_values);
+
+      if (UNLIKELY (profile))
+        {
+          ToyprofStamp profile_stamp2;
+          toyprof_stamp (profile_stamp2);
+          uint64 duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
+          if (duration > profile->profile_maxtime)
+            {
+              profile->profile_maxtime = duration;
+              profile->profile_node = node;
+            }
+        }
+
+      _engine_push_processed_node (node);
+      node = _engine_pop_unprocessed_node ();
+    }
+}
+
+namespace BseInternal {
+static std::atomic<int>          slaves_running { false };
+static std::atomic<int>          slave_counter { 1 };
+static std::mutex                slave_mutex;
+static std::condition_variable   slave_condition;
+static std::vector<std::thread*> slave_threads;
+
+void
+engine_start_slaves ()
+{
+  assert_return (slaves_running == false);
+  slaves_running = true;
+  const uint n_cpus = 4; // FIXME: processors online?
+  const uint n_slaves = n_cpus - 1;
+  for (uint i = 0; i < n_slaves; i++)
+    slave_threads.push_back (new std::thread (engine_run_slave));
+}
+
+void
+engine_stop_slaves ()
+{
+  assert_return (slaves_running == true);
+  slaves_running = false;
+  while (!slave_threads.empty())
+    {
+      engine_wakeup_slaves();
+      std::thread *slave = slave_threads.back();
+      slave_threads.pop_back();
+      slave->join();
+      delete slave;
+    }
+}
+
+void
+engine_wakeup_slaves()
+{
+  slave_condition.notify_all();
+}
+
+void
+engine_run_slave ()
+{
+  String myid = string_format ("DSP #%u", ++slave_counter);
+  Bse::TaskRegistry::add (myid, Rapicorn::ThisThread::process_pid(), Rapicorn::ThisThread::thread_pid());
+  while (slaves_running)
+    {
+      thread_process_nodes (bse_engine_block_size(), NULL); // FIXME: merge profile data
+      std::unique_lock<std::mutex> slave_lock (slave_mutex);
+      slave_condition.wait (slave_lock);
+    }
+  Bse::TaskRegistry::remove (Rapicorn::ThisThread::thread_pid());
+}
+
+} // BseInternal
 
 static void
 master_process_flow (void)
@@ -867,9 +957,8 @@ master_process_flow (void)
   const guint64 current_stamp = Bse::TickStamp::current();
   guint n_values = bse_engine_block_size();
   guint64 final_counter = current_stamp + n_values;
-  guint64 profile_maxtime = 0;
-  gboolean profile_modules = gsl_profile_modules;
-  EngineNode *profile_node = NULL;
+  ProfileData profile_data = { 0, NULL };
+  ProfileData *profile = bse_profile_modules ? &profile_data : NULL;
 
   assert_return (master_need_process == TRUE);
 
@@ -879,34 +968,12 @@ master_process_flow (void)
     {
       _engine_schedule_restart (master_schedule);
       _engine_set_schedule (master_schedule);
+      BseInternal::engine_wakeup_slaves();
 
-      EngineNode *node = _engine_pop_unprocessed_node ();
-      while (node)
-       {
-         ToyprofStamp profile_stamp1, profile_stamp2;
-
-         if (UNLIKELY (profile_modules))
-           toyprof_stamp (profile_stamp1);
-
-         master_process_locked_node (node, n_values);
-
-         if (UNLIKELY (profile_modules))
-           {
-             toyprof_stamp (profile_stamp2);
-             guint64 duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
-             if (duration > profile_maxtime)
-               {
-                 profile_maxtime = duration;
-                 profile_node = node;
-               }
-           }
-
-         _engine_push_processed_node (node);
-         node = _engine_pop_unprocessed_node ();
-       }
+      thread_process_nodes (n_values, profile);
 
       /* walk unscheduled nodes with flow jobs */
-      node = _engine_mnl_head ();
+      EngineNode *node = _engine_mnl_head ();
       while (node && BSE_ENGINE_MNL_UNSCHEDULED_TJOB_NODE (node))
        {
          EngineNode *tmp = node->mnl_next;
@@ -934,16 +1001,18 @@ master_process_flow (void)
             master_take_probes (node, current_stamp, n_values, PROBE_SCHEDULED);
         }
 
-      if (UNLIKELY (profile_modules))
+      if (UNLIKELY (profile))
        {
-         if (profile_node)
+         if (profile->profile_node)
            {
-             if (profile_maxtime > guint64 (profile_modules))
+             if (profile->profile_maxtime > uint64 (bse_profile_modules))
                printout ("Excess Node: %p  Duration: %llu usecs     ((void(*)())%p)         \n",
-                          profile_node, (long long unsigned int) profile_maxtime, 
profile_node->module.klass->process);
+                          profile->profile_node, (long long unsigned int) profile->profile_maxtime,
+                          profile->profile_node->module.klass->process);
              else
                printout ("Slowest Node: %p  Duration: %llu usecs     ((void(*)())%p)         \r",
-                          profile_node, (long long unsigned int) profile_maxtime, 
profile_node->module.klass->process);
+                          profile->profile_node, (long long unsigned int) profile->profile_maxtime,
+                          profile->profile_node->module.klass->process);
            }
        }
 
@@ -1185,6 +1254,7 @@ MasterThread::reap_master_thread ()
   assert (master_thread_singleton != NULL);
   assert_return (master_thread_running == true);
   master_thread_running = false;
+  BseInternal::engine_stop_slaves();
   MasterThread::wakeup();
   MasterThread *mthread = master_thread_singleton;
   mthread->thread_.join();
@@ -1203,6 +1273,7 @@ MasterThread::start (const std::function<void()> &caller_wakeup)
     fatal ("BSE: failed to install master thread reaper");
   master_thread_running = true;
   mthread->thread_ = std::thread (&MasterThread::master_thread, mthread);
+  BseInternal::engine_start_slaves();
 }
 
 void
diff --git a/bse/bseenginemaster.hh b/bse/bseenginemaster.hh
index 21b57af..c8b80e6 100644
--- a/bse/bseenginemaster.hh
+++ b/bse/bseenginemaster.hh
@@ -24,4 +24,14 @@ public:
 
 } // Bse
 
+
+namespace BseInternal {
+
+void    engine_run_slave        ();
+void    engine_start_slaves     ();
+void    engine_stop_slaves      ();
+void    engine_wakeup_slaves    ();
+
+} // BseInternal
+
 #endif /* __BSE_ENGINE_MASTER_H__ */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]