[fractal/fractal-next] session: Improve how the sync is performed



commit ae9af3602e94ef1321f2ac0dd40809f032b93555
Author: Julian Sparber <julian sparber net>
Date:   Mon Oct 11 15:37:04 2021 +0200

    session: Improve how the sync is performed
    
    This switches to sync_stream(), and disconnects the sync loop when the
    session is dropped.
    This also makes sure that the session is dropped when the access token
    is invalidated. This also adds loging to the cleanup_session().

 src/session/mod.rs | 118 ++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 90 insertions(+), 28 deletions(-)
---
diff --git a/src/session/mod.rs b/src/session/mod.rs
index c74e717c..5409a357 100644
--- a/src/session/mod.rs
+++ b/src/session/mod.rs
@@ -27,12 +27,15 @@ use crate::RUNTIME;
 use crate::login::LoginError;
 use crate::session::content::ContentType;
 use adw::subclass::prelude::BinImpl;
+use futures::StreamExt;
 use gettextrs::gettext;
 use gtk::subclass::prelude::*;
 use gtk::{self, prelude::*};
-use gtk::{gdk, glib, glib::clone, glib::SyncSender, CompositeTemplate, SelectionModel};
-use gtk_macros::send;
-use log::error;
+use gtk::{
+    gdk, glib, glib::clone, glib::source::SourceId, glib::SyncSender, CompositeTemplate,
+    SelectionModel,
+};
+use log::{debug, error};
 use matrix_sdk::ruma::{
     api::client::r0::{
         filter::{FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter},
@@ -43,12 +46,17 @@ use matrix_sdk::ruma::{
 use matrix_sdk::{
     config::{ClientConfig, RequestConfig, SyncSettings},
     deserialized_responses::SyncResponse,
+    ruma::api::{
+        client::error::ErrorKind,
+        error::{FromHttpResponseError, ServerError},
+    },
     uuid::Uuid,
-    Client,
+    Client, HttpError,
 };
 use rand::{distributions::Alphanumeric, thread_rng, Rng};
 use std::fs;
 use std::time::Duration;
+use tokio::task::JoinHandle;
 use url::Url;
 
 mod imp {
@@ -75,6 +83,8 @@ mod imp {
         pub selected_content_type: Cell<ContentType>,
         pub is_ready: Cell<bool>,
         pub info: OnceCell<StoredSession>,
+        pub source_id: RefCell<Option<SourceId>>,
+        pub sync_tokio_handle: RefCell<Option<JoinHandle<()>>>,
     }
 
     #[glib::object_subclass]
@@ -207,6 +217,16 @@ mod imp {
             });
             SIGNALS.as_ref()
         }
+
+        fn dispose(&self, _obj: &Self::Type) {
+            if let Some(source_id) = self.source_id.take() {
+                let _ = glib::Source::remove(source_id);
+            }
+
+            if let Some(handle) = self.sync_tokio_handle.take() {
+                handle.abort();
+            }
+        }
     }
     impl WidgetImpl for Session {}
     impl BinImpl for Session {}
@@ -395,9 +415,10 @@ impl Session {
     }
 
     fn sync(&self) {
+        let priv_ = imp::Session::from_instance(self);
         let sender = self.create_new_sync_response_sender();
         let client = self.client();
-        RUNTIME.spawn(async move {
+        let handle = RUNTIME.spawn(async move {
             // TODO: only create the filter once and reuse it in the future
             let room_event_filter = assign!(RoomEventFilter::default(), {
                 lazy_load_options: LazyLoadOptions::Enabled {include_redundant_members: false},
@@ -412,19 +433,20 @@ impl Session {
             let sync_settings = SyncSettings::new()
                 .timeout(Duration::from_secs(30))
                 .filter(filter.into());
-            client
-                .sync_with_callback(sync_settings, |response| {
-                    let sender = sender.clone();
-                    async move {
-                        // Using the event handler doesn't make a lot of sense for us since we want every 
room event
-                        // Eventually we should contribute a better EventHandler interface so that it makes 
sense to use it.
-                        send!(sender, response);
 
-                        matrix_sdk::LoopCtrl::Continue
+            // We need to automatically restart the stream because it gets killed on error
+            loop {
+                let mut sync_stream = Box::pin(client.sync_stream(sync_settings.clone()).await);
+                while let Some(response) = sync_stream.next().await {
+                    if sender.send(response).is_err() {
+                        debug!("Stop syncing because the session was disposed");
+                        return;
                     }
-                })
-                .await;
+                }
+            }
         });
+
+        priv_.sync_tokio_handle.replace(Some(handle));
     }
 
     fn mark_ready(&self) {
@@ -458,21 +480,24 @@ impl Session {
     }
 
     /// Sets up the required channel to receive new room events
-    fn create_new_sync_response_sender(&self) -> SyncSender<SyncResponse> {
-        let (sender, receiver) =
-            glib::MainContext::sync_channel::<SyncResponse>(Default::default(), 100);
-        receiver.attach(
+    fn create_new_sync_response_sender(
+        &self,
+    ) -> SyncSender<Result<SyncResponse, matrix_sdk::Error>> {
+        let priv_ = imp::Session::from_instance(self);
+        let (sender, receiver) = glib::MainContext::sync_channel::<
+            Result<SyncResponse, matrix_sdk::Error>,
+        >(Default::default(), 100);
+        let source_id = receiver.attach(
             None,
             clone!(@weak self as obj => @default-return glib::Continue(false), move |response| {
-                if !obj.is_ready() {
-                        obj.mark_ready();
-                }
                 obj.handle_sync_response(response);
 
                 glib::Continue(true)
             }),
         );
 
+        priv_.source_id.replace(Some(source_id));
+
         sender
     }
 
@@ -506,8 +531,29 @@ impl Session {
         .unwrap()
     }
 
-    fn handle_sync_response(&self, response: SyncResponse) {
-        self.room_list().handle_response_rooms(response.rooms);
+    fn handle_sync_response(&self, response: Result<SyncResponse, matrix_sdk::Error>) {
+        match response {
+            Ok(response) => {
+                if !self.is_ready() {
+                    self.mark_ready();
+                }
+                self.room_list().handle_response_rooms(response.rooms);
+            }
+            Err(error) => {
+                if let matrix_sdk::Error::Http(HttpError::ClientApi(FromHttpResponseError::Http(
+                    ServerError::Known(ref error),
+                ))) = error
+                {
+                    match error.kind {
+                        ErrorKind::UnknownToken { soft_logout: _ } => {
+                            self.cleanup_session();
+                        }
+                        _ => {}
+                    }
+                }
+                error!("Failed to perform sync: {:?}", error);
+            }
+        }
     }
 
     pub fn set_logged_in_users(&self, sessions_stack_pages: &SelectionModel) {
@@ -566,13 +612,29 @@ impl Session {
     }
 
     fn cleanup_session(&self) {
-        let priv_ = &imp::Session::from_instance(self);
+        let priv_ = imp::Session::from_instance(self);
         let info = priv_.info.get().unwrap();
 
         priv_.is_ready.set(false);
-        secret::remove_session(info).unwrap();
-        priv_.client.take();
-        fs::remove_dir_all(info.path.clone()).unwrap();
+
+        if let Some(source_id) = priv_.source_id.take() {
+            let _ = glib::Source::remove(source_id);
+        }
+
+        if let Some(handle) = priv_.sync_tokio_handle.take() {
+            handle.abort();
+        }
+
+        if let Err(error) = secret::remove_session(info) {
+            error!(
+                "Failed to remove credentials from SecretService after logout: {}",
+                error
+            );
+        }
+
+        if let Err(error) = fs::remove_dir_all(info.path.clone()) {
+            error!("Failed to remove database after logout: {}", error);
+        }
 
         self.emit_by_name("logged-out", &[]).unwrap();
     }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]