[fractal] Backend: encapsulate semaphore behaviour



commit 9fa4dc354401edeec5532cfed518670b6e2c2e80
Author: Alejandro Domínguez <adomu net-c com>
Date:   Sat Apr 4 08:46:20 2020 +0200

    Backend: encapsulate semaphore behaviour

 fractal-matrix-api/src/backend/media.rs |  7 +++--
 fractal-matrix-api/src/backend/mod.rs   |  6 +++--
 fractal-matrix-api/src/backend/types.rs | 47 ++++++++++++++++++++++++++++++---
 fractal-matrix-api/src/backend/user.rs  |  5 ++--
 fractal-matrix-api/src/util.rs          | 31 ----------------------
 5 files changed, 53 insertions(+), 43 deletions(-)
---
diff --git a/fractal-matrix-api/src/backend/media.rs b/fractal-matrix-api/src/backend/media.rs
index f7996190..6322ba76 100644
--- a/fractal-matrix-api/src/backend/media.rs
+++ b/fractal-matrix-api/src/backend/media.rs
@@ -8,7 +8,6 @@ use url::Url;
 use crate::r0::AccessToken;
 use crate::util::dw_media;
 use crate::util::get_prev_batch_from;
-use crate::util::semaphore;
 use crate::util::ContentType;
 use crate::util::ResultExpectLog;
 use crate::util::HTTP_CLIENT;
@@ -21,14 +20,14 @@ use crate::r0::message::get_message_events::Response as GetMessagesEventsRespons
 use crate::types::Message;
 
 pub fn get_thumb_async(bk: &Backend, baseu: Url, media: String, tx: Sender<Result<String, Error>>) {
-    semaphore(bk.limit_threads.clone(), move || {
+    bk.thread_pool.run(move || {
         let fname = dw_media(baseu, &media, ContentType::default_thumbnail(), None);
         tx.send(fname).expect_log("Connection closed");
     });
 }
 
 pub fn get_media_async(bk: &Backend, baseu: Url, media: String, tx: Sender<Result<String, Error>>) {
-    semaphore(bk.limit_threads.clone(), move || {
+    bk.thread_pool.run(move || {
         let fname = dw_media(baseu, &media, ContentType::Download, None);
         tx.send(fname).expect_log("Connection closed");
     });
@@ -43,7 +42,7 @@ pub fn get_media_list_async(
     prev_batch: Option<String>,
     tx: Sender<(Vec<Message>, String)>,
 ) {
-    semaphore(bk.limit_threads.clone(), move || {
+    bk.thread_pool.run(move || {
         let media_list = prev_batch
             // FIXME: This should never be an empty token
             .or_else(|| {
diff --git a/fractal-matrix-api/src/backend/mod.rs b/fractal-matrix-api/src/backend/mod.rs
index 41f0f4ae..1f9c62c3 100644
--- a/fractal-matrix-api/src/backend/mod.rs
+++ b/fractal-matrix-api/src/backend/mod.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
 use std::sync::mpsc::channel;
 use std::sync::mpsc::RecvError;
 use std::sync::mpsc::{Receiver, Sender};
-use std::sync::{Arc, Condvar, Mutex};
+use std::sync::{Arc, Mutex};
 use std::thread;
 
 use crate::util::dw_media;
@@ -13,6 +13,8 @@ use crate::cache::CacheMap;
 
 use crate::globals;
 
+use self::types::ThreadPool;
+
 mod directory;
 mod media;
 pub mod register;
@@ -38,7 +40,7 @@ impl Backend {
             tx,
             data: Arc::new(Mutex::new(data)),
             user_info_cache: CacheMap::new().timeout(60 * 60),
-            limit_threads: Arc::new((Mutex::new(0u8), Condvar::new())),
+            thread_pool: ThreadPool::new(20),
         }
     }
 
diff --git a/fractal-matrix-api/src/backend/types.rs b/fractal-matrix-api/src/backend/types.rs
index 287e39e7..8b8b5397 100644
--- a/fractal-matrix-api/src/backend/types.rs
+++ b/fractal-matrix-api/src/backend/types.rs
@@ -2,6 +2,7 @@ use ruma_identifiers::{RoomId, UserId};
 use std::collections::HashMap;
 use std::sync::mpsc::Sender;
 use std::sync::{Arc, Condvar, Mutex};
+use std::thread;
 
 use crate::error::Error;
 
@@ -146,19 +147,59 @@ pub enum RoomType {
     Private,
 }
 
+pub struct ThreadPool {
+    thread_count: Arc<(Mutex<u8>, Condvar)>,
+    limit: u8,
+}
+
+impl ThreadPool {
+    pub fn new(limit: u8) -> Self {
+        ThreadPool {
+            thread_count: Arc::new((Mutex::new(0), Condvar::new())),
+            limit,
+        }
+    }
+
+    pub fn run<F>(&self, func: F)
+    where
+        F: FnOnce() + Send + 'static,
+    {
+        let thread_count = self.thread_count.clone();
+        let limit = self.limit;
+        thread::spawn(move || {
+            // waiting, less than {limit} threads at the same time
+            let &(ref num, ref cvar) = &*thread_count;
+            {
+                let mut start = num.lock().unwrap();
+                while *start >= limit {
+                    start = cvar.wait(start).unwrap()
+                }
+                *start += 1;
+            }
+
+            func();
+
+            // freeing the cvar for new threads
+            {
+                let mut counter = num.lock().unwrap();
+                *counter -= 1;
+            }
+            cvar.notify_one();
+        });
+    }
+}
+
 pub struct BackendData {
     pub rooms_since: String,
     pub join_to_room: Option<RoomId>,
     pub m_direct: HashMap<UserId, Vec<RoomId>>,
 }
 
-#[derive(Clone)]
 pub struct Backend {
     pub tx: Sender<BKResponse>,
     pub data: Arc<Mutex<BackendData>>,
 
     // user info cache, uid -> (name, avatar)
     pub user_info_cache: CacheMap<UserId, Arc<Mutex<(String, String)>>>,
-    // semaphore to limit the number of threads downloading images
-    pub limit_threads: Arc<(Mutex<u8>, Condvar)>,
+    pub thread_pool: ThreadPool,
 }
diff --git a/fractal-matrix-api/src/backend/user.rs b/fractal-matrix-api/src/backend/user.rs
index e01fee44..ad175896 100644
--- a/fractal-matrix-api/src/backend/user.rs
+++ b/fractal-matrix-api/src/backend/user.rs
@@ -7,7 +7,6 @@ use crate::error::Error;
 use crate::util::cache_dir_path;
 use crate::util::dw_media;
 use crate::util::get_user_avatar;
-use crate::util::semaphore;
 use crate::util::ContentType;
 use crate::util::ResultExpectLog;
 use crate::util::HTTP_CLIENT;
@@ -348,7 +347,7 @@ pub fn get_avatar_async(bk: &Backend, base: Url, member: Option<Member>, tx: Sen
         let uid = member.uid.clone();
         let avatar = member.avatar.clone().unwrap_or_default();
 
-        semaphore(bk.limit_threads.clone(), move || {
+        bk.thread_pool.run(move || {
             let fname = get_user_avatar_img(base, &uid, &avatar).unwrap_or_default();
             tx.send(fname).expect_log("Connection closed");
         });
@@ -417,7 +416,7 @@ pub fn get_user_info_async(
     let info: Arc<Mutex<(String, String)>> = Default::default();
     bk.user_info_cache.insert(uid.clone(), info.clone());
 
-    semaphore(bk.limit_threads.clone(), move || {
+    bk.thread_pool.run(move || {
         match (get_user_avatar(baseu, &uid), tx) {
             (Ok(i0), Some(tx)) => {
                 tx.send(i0.clone()).expect_log("Connection closed");
diff --git a/fractal-matrix-api/src/util.rs b/fractal-matrix-api/src/util.rs
index c4114e30..86524240 100644
--- a/fractal-matrix-api/src/util.rs
+++ b/fractal-matrix-api/src/util.rs
@@ -15,8 +15,6 @@ use url::Url;
 use std::fs::{create_dir_all, write};
 
 use std::sync::mpsc::SendError;
-use std::sync::{Arc, Condvar, Mutex};
-use std::thread;
 
 use crate::client::Client;
 use crate::error::Error;
@@ -41,35 +39,6 @@ lazy_static! {
         .unwrap_or(std::env::temp_dir().join("fractal"));
 }
 
-pub fn semaphore<F>(thread_count: Arc<(Mutex<u8>, Condvar)>, func: F)
-where
-    F: FnOnce() + Send + 'static,
-{
-    thread::spawn(move || {
-        // waiting, less than 20 threads at the same time
-        // this is a semaphore
-        // TODO: use std::sync::Semaphore when it's on stable version
-        // https://doc.rust-lang.org/1.1.0/std/sync/struct.Semaphore.html
-        let &(ref num, ref cvar) = &*thread_count;
-        {
-            let mut start = num.lock().unwrap();
-            while *start >= 20 {
-                start = cvar.wait(start).unwrap()
-            }
-            *start += 1;
-        }
-
-        func();
-
-        // freeing the cvar for new threads
-        {
-            let mut counter = num.lock().unwrap();
-            *counter -= 1;
-        }
-        cvar.notify_one();
-    });
-}
-
 // from https://stackoverflow.com/a/43992218/1592377
 #[macro_export]
 macro_rules! clone {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]