[fractal] Backend: encapsulate semaphore behaviour
- From: Daniel Garcia Moreno <danigm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [fractal] Backend: encapsulate semaphore behaviour
- Date: Mon, 13 Apr 2020 06:40:38 +0000 (UTC)
commit 9fa4dc354401edeec5532cfed518670b6e2c2e80
Author: Alejandro DomÃnguez <adomu net-c com>
Date: Sat Apr 4 08:46:20 2020 +0200
Backend: encapsulate semaphore behaviour
fractal-matrix-api/src/backend/media.rs | 7 +++--
fractal-matrix-api/src/backend/mod.rs | 6 +++--
fractal-matrix-api/src/backend/types.rs | 47 ++++++++++++++++++++++++++++++---
fractal-matrix-api/src/backend/user.rs | 5 ++--
fractal-matrix-api/src/util.rs | 31 ----------------------
5 files changed, 53 insertions(+), 43 deletions(-)
---
diff --git a/fractal-matrix-api/src/backend/media.rs b/fractal-matrix-api/src/backend/media.rs
index f7996190..6322ba76 100644
--- a/fractal-matrix-api/src/backend/media.rs
+++ b/fractal-matrix-api/src/backend/media.rs
@@ -8,7 +8,6 @@ use url::Url;
use crate::r0::AccessToken;
use crate::util::dw_media;
use crate::util::get_prev_batch_from;
-use crate::util::semaphore;
use crate::util::ContentType;
use crate::util::ResultExpectLog;
use crate::util::HTTP_CLIENT;
@@ -21,14 +20,14 @@ use crate::r0::message::get_message_events::Response as GetMessagesEventsRespons
use crate::types::Message;
pub fn get_thumb_async(bk: &Backend, baseu: Url, media: String, tx: Sender<Result<String, Error>>) {
- semaphore(bk.limit_threads.clone(), move || {
+ bk.thread_pool.run(move || {
let fname = dw_media(baseu, &media, ContentType::default_thumbnail(), None);
tx.send(fname).expect_log("Connection closed");
});
}
pub fn get_media_async(bk: &Backend, baseu: Url, media: String, tx: Sender<Result<String, Error>>) {
- semaphore(bk.limit_threads.clone(), move || {
+ bk.thread_pool.run(move || {
let fname = dw_media(baseu, &media, ContentType::Download, None);
tx.send(fname).expect_log("Connection closed");
});
@@ -43,7 +42,7 @@ pub fn get_media_list_async(
prev_batch: Option<String>,
tx: Sender<(Vec<Message>, String)>,
) {
- semaphore(bk.limit_threads.clone(), move || {
+ bk.thread_pool.run(move || {
let media_list = prev_batch
// FIXME: This should never be an empty token
.or_else(|| {
diff --git a/fractal-matrix-api/src/backend/mod.rs b/fractal-matrix-api/src/backend/mod.rs
index 41f0f4ae..1f9c62c3 100644
--- a/fractal-matrix-api/src/backend/mod.rs
+++ b/fractal-matrix-api/src/backend/mod.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::{Receiver, Sender};
-use std::sync::{Arc, Condvar, Mutex};
+use std::sync::{Arc, Mutex};
use std::thread;
use crate::util::dw_media;
@@ -13,6 +13,8 @@ use crate::cache::CacheMap;
use crate::globals;
+use self::types::ThreadPool;
+
mod directory;
mod media;
pub mod register;
@@ -38,7 +40,7 @@ impl Backend {
tx,
data: Arc::new(Mutex::new(data)),
user_info_cache: CacheMap::new().timeout(60 * 60),
- limit_threads: Arc::new((Mutex::new(0u8), Condvar::new())),
+ thread_pool: ThreadPool::new(20),
}
}
diff --git a/fractal-matrix-api/src/backend/types.rs b/fractal-matrix-api/src/backend/types.rs
index 287e39e7..8b8b5397 100644
--- a/fractal-matrix-api/src/backend/types.rs
+++ b/fractal-matrix-api/src/backend/types.rs
@@ -2,6 +2,7 @@ use ruma_identifiers::{RoomId, UserId};
use std::collections::HashMap;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
+use std::thread;
use crate::error::Error;
@@ -146,19 +147,59 @@ pub enum RoomType {
Private,
}
+pub struct ThreadPool {
+ thread_count: Arc<(Mutex<u8>, Condvar)>,
+ limit: u8,
+}
+
+impl ThreadPool {
+ pub fn new(limit: u8) -> Self {
+ ThreadPool {
+ thread_count: Arc::new((Mutex::new(0), Condvar::new())),
+ limit,
+ }
+ }
+
+ pub fn run<F>(&self, func: F)
+ where
+ F: FnOnce() + Send + 'static,
+ {
+ let thread_count = self.thread_count.clone();
+ let limit = self.limit;
+ thread::spawn(move || {
+ // waiting, less than {limit} threads at the same time
+ let &(ref num, ref cvar) = &*thread_count;
+ {
+ let mut start = num.lock().unwrap();
+ while *start >= limit {
+ start = cvar.wait(start).unwrap()
+ }
+ *start += 1;
+ }
+
+ func();
+
+ // freeing the cvar for new threads
+ {
+ let mut counter = num.lock().unwrap();
+ *counter -= 1;
+ }
+ cvar.notify_one();
+ });
+ }
+}
+
pub struct BackendData {
pub rooms_since: String,
pub join_to_room: Option<RoomId>,
pub m_direct: HashMap<UserId, Vec<RoomId>>,
}
-#[derive(Clone)]
pub struct Backend {
pub tx: Sender<BKResponse>,
pub data: Arc<Mutex<BackendData>>,
// user info cache, uid -> (name, avatar)
pub user_info_cache: CacheMap<UserId, Arc<Mutex<(String, String)>>>,
- // semaphore to limit the number of threads downloading images
- pub limit_threads: Arc<(Mutex<u8>, Condvar)>,
+ pub thread_pool: ThreadPool,
}
diff --git a/fractal-matrix-api/src/backend/user.rs b/fractal-matrix-api/src/backend/user.rs
index e01fee44..ad175896 100644
--- a/fractal-matrix-api/src/backend/user.rs
+++ b/fractal-matrix-api/src/backend/user.rs
@@ -7,7 +7,6 @@ use crate::error::Error;
use crate::util::cache_dir_path;
use crate::util::dw_media;
use crate::util::get_user_avatar;
-use crate::util::semaphore;
use crate::util::ContentType;
use crate::util::ResultExpectLog;
use crate::util::HTTP_CLIENT;
@@ -348,7 +347,7 @@ pub fn get_avatar_async(bk: &Backend, base: Url, member: Option<Member>, tx: Sen
let uid = member.uid.clone();
let avatar = member.avatar.clone().unwrap_or_default();
- semaphore(bk.limit_threads.clone(), move || {
+ bk.thread_pool.run(move || {
let fname = get_user_avatar_img(base, &uid, &avatar).unwrap_or_default();
tx.send(fname).expect_log("Connection closed");
});
@@ -417,7 +416,7 @@ pub fn get_user_info_async(
let info: Arc<Mutex<(String, String)>> = Default::default();
bk.user_info_cache.insert(uid.clone(), info.clone());
- semaphore(bk.limit_threads.clone(), move || {
+ bk.thread_pool.run(move || {
match (get_user_avatar(baseu, &uid), tx) {
(Ok(i0), Some(tx)) => {
tx.send(i0.clone()).expect_log("Connection closed");
diff --git a/fractal-matrix-api/src/util.rs b/fractal-matrix-api/src/util.rs
index c4114e30..86524240 100644
--- a/fractal-matrix-api/src/util.rs
+++ b/fractal-matrix-api/src/util.rs
@@ -15,8 +15,6 @@ use url::Url;
use std::fs::{create_dir_all, write};
use std::sync::mpsc::SendError;
-use std::sync::{Arc, Condvar, Mutex};
-use std::thread;
use crate::client::Client;
use crate::error::Error;
@@ -41,35 +39,6 @@ lazy_static! {
.unwrap_or(std::env::temp_dir().join("fractal"));
}
-pub fn semaphore<F>(thread_count: Arc<(Mutex<u8>, Condvar)>, func: F)
-where
- F: FnOnce() + Send + 'static,
-{
- thread::spawn(move || {
- // waiting, less than 20 threads at the same time
- // this is a semaphore
- // TODO: use std::sync::Semaphore when it's on stable version
- // https://doc.rust-lang.org/1.1.0/std/sync/struct.Semaphore.html
- let &(ref num, ref cvar) = &*thread_count;
- {
- let mut start = num.lock().unwrap();
- while *start >= 20 {
- start = cvar.wait(start).unwrap()
- }
- *start += 1;
- }
-
- func();
-
- // freeing the cvar for new threads
- {
- let mut counter = num.lock().unwrap();
- *counter -= 1;
- }
- cvar.notify_one();
- });
-}
-
// from https://stackoverflow.com/a/43992218/1592377
#[macro_export]
macro_rules! clone {
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]