[fractal] session: Use spawn to handle sync response



commit 06d638b171eacc358ca42f7280978bd393b97458
Author: Julian Sparber <julian sparber net>
Date:   Fri Jul 22 18:34:05 2022 +0200

    session: Use spawn to handle sync response
    
    This removes the channel used to pass the sync response to the
    MainContext.
    This removes the extra loop to automatically restart the sync stream
    since it's not needed anymore.
    
    This also removes the oneshot channel because the tokio task is aborted
    anyways when the Session object is disposed.

 src/session/mod.rs | 57 +++++++++++-------------------------------------------
 1 file changed, 11 insertions(+), 46 deletions(-)
---
diff --git a/src/session/mod.rs b/src/session/mod.rs
index 8da698a55..040d53547 100644
--- a/src/session/mod.rs
+++ b/src/session/mod.rs
@@ -15,13 +15,7 @@ use std::{collections::HashSet, convert::TryFrom, fs, path::PathBuf, time::Durat
 use adw::subclass::prelude::BinImpl;
 use futures::StreamExt;
 use gettextrs::gettext;
-use gtk::{
-    self, gdk, glib,
-    glib::{clone, source::SourceId, SyncSender},
-    prelude::*,
-    subclass::prelude::*,
-    CompositeTemplate,
-};
+use gtk::{self, gdk, glib, glib::clone, prelude::*, subclass::prelude::*, CompositeTemplate};
 use log::{debug, error, warn};
 use matrix_sdk::{
     config::{RequestConfig, StoreConfig, SyncSettings},
@@ -119,7 +113,6 @@ mod imp {
         pub is_ready: Cell<bool>,
         pub logout_on_dispose: Cell<bool>,
         pub info: OnceCell<StoredSession>,
-        pub source_id: RefCell<Option<SourceId>>,
         pub sync_tokio_handle: RefCell<Option<JoinHandle<()>>>,
     }
 
@@ -263,10 +256,6 @@ mod imp {
         }
 
         fn dispose(&self, obj: &Self::Type) {
-            if let Some(source_id) = self.source_id.take() {
-                source_id.remove();
-            }
-
             if let Some(handle) = self.sync_tokio_handle.take() {
                 handle.abort();
             }
@@ -500,8 +489,9 @@ impl Session {
     }
 
     fn sync(&self) {
-        let sender = self.create_new_sync_response_sender();
         let client = self.client();
+        let session_weak: glib::SendWeakRef<Session> = self.downgrade().into();
+
         let handle = spawn_tokio!(async move {
             let sync_token = client.sync_token().await;
             if sync_token.is_none() {
@@ -523,15 +513,15 @@ impl Session {
                 .timeout(Duration::from_secs(30))
                 .filter(filter.into());
 
-            // We need to automatically restart the stream because it gets killed on error
-            loop {
-                let mut sync_stream = Box::pin(client.sync_stream(sync_settings.clone()).await);
-                while let Some(response) = sync_stream.next().await {
-                    if sender.send(response).is_err() {
-                        debug!("Stop syncing because the session was disposed");
-                        return;
+            let mut sync_stream = Box::pin(client.sync_stream(sync_settings).await);
+            while let Some(response) = sync_stream.next().await {
+                let session_weak = session_weak.clone();
+                let ctx = glib::MainContext::default();
+                ctx.spawn(async move {
+                    if let Some(session) = session_weak.upgrade() {
+                        session.handle_sync_response(response);
                     }
-                }
+                });
             }
         });
 
@@ -644,27 +634,6 @@ impl Session {
             .expect("The session isn't ready")
     }
 
-    /// Sets up the required channel to receive new room events
-    fn create_new_sync_response_sender(
-        &self,
-    ) -> SyncSender<Result<SyncResponse, matrix_sdk::Error>> {
-        let (sender, receiver) = glib::MainContext::sync_channel::<
-            Result<SyncResponse, matrix_sdk::Error>,
-        >(Default::default(), 100);
-        let source_id = receiver.attach(
-            None,
-            clone!(@weak self as obj => @default-return glib::Continue(false), move |response| {
-                obj.handle_sync_response(response);
-
-                glib::Continue(true)
-            }),
-        );
-
-        self.imp().source_id.replace(Some(source_id));
-
-        sender
-    }
-
     /// Connects the prepared signals to the function f given in input
     pub fn connect_prepared<F: Fn(&Self, Option<String>) + 'static>(
         &self,
@@ -795,10 +764,6 @@ impl Session {
 
         priv_.is_ready.set(false);
 
-        if let Some(source_id) = priv_.source_id.take() {
-            source_id.remove();
-        }
-
         if let Some(handle) = priv_.sync_tokio_handle.take() {
             handle.abort();
         }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]