[beast: 4/6] BSE: start 3 additional DSP slave threads to process modules concurrently
- From: Tim Janik <timj src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [beast: 4/6] BSE: start 3 additional DSP slave threads to process modules concurrently
- Date: Sun, 6 Nov 2016 22:32:47 +0000 (UTC)
commit 315b5294bad948991b99f6de032af07dd8a78578
Author: Tim Janik <timj gnu org>
Date: Sun Nov 6 23:17:22 2016 +0100
BSE: start 3 additional DSP slave threads to process modules concurrently
Signed-off-by: Tim Janik <timj gnu org>
bse/bseenginemaster.cc | 139 ++++++++++++++++++++++++++++++++++++------------
bse/bseenginemaster.hh | 10 ++++
2 files changed, 115 insertions(+), 34 deletions(-)
---
diff --git a/bse/bseenginemaster.cc b/bse/bseenginemaster.cc
index 9f1e1eb..2ca4a85 100644
--- a/bse/bseenginemaster.cc
+++ b/bse/bseenginemaster.cc
@@ -859,7 +859,97 @@ master_process_locked_node (EngineNode *node,
}
}
-static gboolean gsl_profile_modules = 0; /* set to 1 in gdb to get profile output */
+static bool bse_profile_modules = 0; /* set to 1 in gdb to get profile output */
+
+struct ProfileData {
+ uint64 profile_maxtime;
+ EngineNode *profile_node;
+};
+
+static void
+thread_process_nodes (const uint n_values, ProfileData *profile)
+{
+ EngineNode *node = _engine_pop_unprocessed_node ();
+ while (node)
+ {
+ ToyprofStamp profile_stamp1;
+
+ if (UNLIKELY (profile))
+ toyprof_stamp (profile_stamp1);
+
+ master_process_locked_node (node, n_values);
+
+ if (UNLIKELY (profile))
+ {
+ ToyprofStamp profile_stamp2;
+ toyprof_stamp (profile_stamp2);
+ uint64 duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
+ if (duration > profile->profile_maxtime)
+ {
+ profile->profile_maxtime = duration;
+ profile->profile_node = node;
+ }
+ }
+
+ _engine_push_processed_node (node);
+ node = _engine_pop_unprocessed_node ();
+ }
+}
+
+namespace BseInternal {
+static std::atomic<int> slaves_running { false };
+static std::atomic<int> slave_counter { 1 };
+static std::mutex slave_mutex;
+static std::condition_variable slave_condition;
+static std::vector<std::thread*> slave_threads;
+
+void
+engine_start_slaves ()
+{
+ assert_return (slaves_running == false);
+ slaves_running = true;
+ const uint n_cpus = 4; // FIXME: processors online?
+ const uint n_slaves = n_cpus - 1;
+ for (uint i = 0; i < n_slaves; i++)
+ slave_threads.push_back (new std::thread (engine_run_slave));
+}
+
+void
+engine_stop_slaves ()
+{
+ assert_return (slaves_running == true);
+ slaves_running = false;
+ while (!slave_threads.empty())
+ {
+ engine_wakeup_slaves();
+ std::thread *slave = slave_threads.back();
+ slave_threads.pop_back();
+ slave->join();
+ delete slave;
+ }
+}
+
+void
+engine_wakeup_slaves()
+{
+ slave_condition.notify_all();
+}
+
+void
+engine_run_slave ()
+{
+ String myid = string_format ("DSP #%u", ++slave_counter);
+ Bse::TaskRegistry::add (myid, Rapicorn::ThisThread::process_pid(), Rapicorn::ThisThread::thread_pid());
+ while (slaves_running)
+ {
+ thread_process_nodes (bse_engine_block_size(), NULL); // FIXME: merge profile data
+ std::unique_lock<std::mutex> slave_lock (slave_mutex);
+ slave_condition.wait (slave_lock);
+ }
+ Bse::TaskRegistry::remove (Rapicorn::ThisThread::thread_pid());
+}
+
+} // BseInternal
static void
master_process_flow (void)
@@ -867,9 +957,8 @@ master_process_flow (void)
const guint64 current_stamp = Bse::TickStamp::current();
guint n_values = bse_engine_block_size();
guint64 final_counter = current_stamp + n_values;
- guint64 profile_maxtime = 0;
- gboolean profile_modules = gsl_profile_modules;
- EngineNode *profile_node = NULL;
+ ProfileData profile_data = { 0, NULL };
+ ProfileData *profile = bse_profile_modules ? &profile_data : NULL;
assert_return (master_need_process == TRUE);
@@ -879,34 +968,12 @@ master_process_flow (void)
{
_engine_schedule_restart (master_schedule);
_engine_set_schedule (master_schedule);
+ BseInternal::engine_wakeup_slaves();
- EngineNode *node = _engine_pop_unprocessed_node ();
- while (node)
- {
- ToyprofStamp profile_stamp1, profile_stamp2;
-
- if (UNLIKELY (profile_modules))
- toyprof_stamp (profile_stamp1);
-
- master_process_locked_node (node, n_values);
-
- if (UNLIKELY (profile_modules))
- {
- toyprof_stamp (profile_stamp2);
- guint64 duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
- if (duration > profile_maxtime)
- {
- profile_maxtime = duration;
- profile_node = node;
- }
- }
-
- _engine_push_processed_node (node);
- node = _engine_pop_unprocessed_node ();
- }
+ thread_process_nodes (n_values, profile);
/* walk unscheduled nodes with flow jobs */
- node = _engine_mnl_head ();
+ EngineNode *node = _engine_mnl_head ();
while (node && BSE_ENGINE_MNL_UNSCHEDULED_TJOB_NODE (node))
{
EngineNode *tmp = node->mnl_next;
@@ -934,16 +1001,18 @@ master_process_flow (void)
master_take_probes (node, current_stamp, n_values, PROBE_SCHEDULED);
}
- if (UNLIKELY (profile_modules))
+ if (UNLIKELY (profile))
{
- if (profile_node)
+ if (profile->profile_node)
{
- if (profile_maxtime > guint64 (profile_modules))
+ if (profile->profile_maxtime > uint64 (bse_profile_modules))
printout ("Excess Node: %p Duration: %llu usecs ((void(*)())%p) \n",
- profile_node, (long long unsigned int) profile_maxtime,
profile_node->module.klass->process);
+ profile->profile_node, (long long unsigned int) profile->profile_maxtime,
+ profile->profile_node->module.klass->process);
else
printout ("Slowest Node: %p Duration: %llu usecs ((void(*)())%p) \r",
- profile_node, (long long unsigned int) profile_maxtime,
profile_node->module.klass->process);
+ profile->profile_node, (long long unsigned int) profile->profile_maxtime,
+ profile->profile_node->module.klass->process);
}
}
@@ -1185,6 +1254,7 @@ MasterThread::reap_master_thread ()
assert (master_thread_singleton != NULL);
assert_return (master_thread_running == true);
master_thread_running = false;
+ BseInternal::engine_stop_slaves();
MasterThread::wakeup();
MasterThread *mthread = master_thread_singleton;
mthread->thread_.join();
@@ -1203,6 +1273,7 @@ MasterThread::start (const std::function<void()> &caller_wakeup)
fatal ("BSE: failed to install master thread reaper");
master_thread_running = true;
mthread->thread_ = std::thread (&MasterThread::master_thread, mthread);
+ BseInternal::engine_start_slaves();
}
void
diff --git a/bse/bseenginemaster.hh b/bse/bseenginemaster.hh
index 21b57af..c8b80e6 100644
--- a/bse/bseenginemaster.hh
+++ b/bse/bseenginemaster.hh
@@ -24,4 +24,14 @@ public:
} // Bse
+
+namespace BseInternal {
+
+void engine_run_slave ();
+void engine_start_slaves ();
+void engine_stop_slaves ();
+void engine_wakeup_slaves ();
+
+} // BseInternal
+
#endif /* __BSE_ENGINE_MASTER_H__ */
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]