[fractal] session: Use spawn to handle sync response
- From: Julian Sparber <jsparber src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [fractal] session: Use spawn to handle sync response
- Date: Tue, 16 Aug 2022 10:54:08 +0000 (UTC)
commit 06d638b171eacc358ca42f7280978bd393b97458
Author: Julian Sparber <julian sparber net>
Date: Fri Jul 22 18:34:05 2022 +0200
session: Use spawn to handle sync response
This removes the channel used to pass the sync response to the
MainContext.
This removes the extra loop to automatically restart the sync stream
since it's not needed anymore.
This also removes the oneshot channel because the tokio task is aborted
anyways when the Session object is disposed.
src/session/mod.rs | 57 +++++++++++-------------------------------------------
1 file changed, 11 insertions(+), 46 deletions(-)
---
diff --git a/src/session/mod.rs b/src/session/mod.rs
index 8da698a55..040d53547 100644
--- a/src/session/mod.rs
+++ b/src/session/mod.rs
@@ -15,13 +15,7 @@ use std::{collections::HashSet, convert::TryFrom, fs, path::PathBuf, time::Durat
use adw::subclass::prelude::BinImpl;
use futures::StreamExt;
use gettextrs::gettext;
-use gtk::{
- self, gdk, glib,
- glib::{clone, source::SourceId, SyncSender},
- prelude::*,
- subclass::prelude::*,
- CompositeTemplate,
-};
+use gtk::{self, gdk, glib, glib::clone, prelude::*, subclass::prelude::*, CompositeTemplate};
use log::{debug, error, warn};
use matrix_sdk::{
config::{RequestConfig, StoreConfig, SyncSettings},
@@ -119,7 +113,6 @@ mod imp {
pub is_ready: Cell<bool>,
pub logout_on_dispose: Cell<bool>,
pub info: OnceCell<StoredSession>,
- pub source_id: RefCell<Option<SourceId>>,
pub sync_tokio_handle: RefCell<Option<JoinHandle<()>>>,
}
@@ -263,10 +256,6 @@ mod imp {
}
fn dispose(&self, obj: &Self::Type) {
- if let Some(source_id) = self.source_id.take() {
- source_id.remove();
- }
-
if let Some(handle) = self.sync_tokio_handle.take() {
handle.abort();
}
@@ -500,8 +489,9 @@ impl Session {
}
fn sync(&self) {
- let sender = self.create_new_sync_response_sender();
let client = self.client();
+ let session_weak: glib::SendWeakRef<Session> = self.downgrade().into();
+
let handle = spawn_tokio!(async move {
let sync_token = client.sync_token().await;
if sync_token.is_none() {
@@ -523,15 +513,15 @@ impl Session {
.timeout(Duration::from_secs(30))
.filter(filter.into());
- // We need to automatically restart the stream because it gets killed on error
- loop {
- let mut sync_stream = Box::pin(client.sync_stream(sync_settings.clone()).await);
- while let Some(response) = sync_stream.next().await {
- if sender.send(response).is_err() {
- debug!("Stop syncing because the session was disposed");
- return;
+ let mut sync_stream = Box::pin(client.sync_stream(sync_settings).await);
+ while let Some(response) = sync_stream.next().await {
+ let session_weak = session_weak.clone();
+ let ctx = glib::MainContext::default();
+ ctx.spawn(async move {
+ if let Some(session) = session_weak.upgrade() {
+ session.handle_sync_response(response);
}
- }
+ });
}
});
@@ -644,27 +634,6 @@ impl Session {
.expect("The session isn't ready")
}
- /// Sets up the required channel to receive new room events
- fn create_new_sync_response_sender(
- &self,
- ) -> SyncSender<Result<SyncResponse, matrix_sdk::Error>> {
- let (sender, receiver) = glib::MainContext::sync_channel::<
- Result<SyncResponse, matrix_sdk::Error>,
- >(Default::default(), 100);
- let source_id = receiver.attach(
- None,
- clone!(@weak self as obj => @default-return glib::Continue(false), move |response| {
- obj.handle_sync_response(response);
-
- glib::Continue(true)
- }),
- );
-
- self.imp().source_id.replace(Some(source_id));
-
- sender
- }
-
/// Connects the prepared signals to the function f given in input
pub fn connect_prepared<F: Fn(&Self, Option<String>) + 'static>(
&self,
@@ -795,10 +764,6 @@ impl Session {
priv_.is_ready.set(false);
- if let Some(source_id) = priv_.source_id.take() {
- source_id.remove();
- }
-
if let Some(handle) = priv_.sync_tokio_handle.take() {
handle.abort();
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]