[fractal/sync-matrix-sdk: 75/75] Manage sync completely from matrix-sdk




commit 6909279345328f7e3d2a24b7aa986ad947004dce
Author: Alejandro Domínguez <adomu net-c com>
Date:   Wed Dec 2 21:20:45 2020 +0100

    Manage sync completely from matrix-sdk

 fractal-gtk/src/appop/login.rs   |   7 +-
 fractal-gtk/src/appop/message.rs |   3 -
 fractal-gtk/src/appop/mod.rs     |   2 -
 fractal-gtk/src/appop/sync.rs    | 280 ++++++++++++++++++++++++---------------
 fractal-gtk/src/backend/mod.rs   |   1 -
 fractal-gtk/src/meson.build      |   1 -
 6 files changed, 173 insertions(+), 121 deletions(-)
---
diff --git a/fractal-gtk/src/appop/login.rs b/fractal-gtk/src/appop/login.rs
index 98f75a90..1c93741f 100644
--- a/fractal-gtk/src/appop/login.rs
+++ b/fractal-gtk/src/appop/login.rs
@@ -57,12 +57,11 @@ impl AppOp {
             }));
 
         self.set_state(AppState::NoRoom);
-        self.since = None;
         self.get_username();
 
         // initial sync, we're shoing some feedback to the user
-        self.initial_sync(true);
-        self.sync(true, 0);
+        self.show_initial_sync();
+        self.setup_sync();
         self.init_protocols();
     }
 
@@ -72,8 +71,6 @@ impl AppOp {
             error!("Error removing cache file");
         }
 
-        self.syncing = false;
-
         self.set_state(AppState::Login);
         self.login_data = None;
     }
diff --git a/fractal-gtk/src/appop/message.rs b/fractal-gtk/src/appop/message.rs
index 92557f2e..28d4ee61 100644
--- a/fractal-gtk/src/appop/message.rs
+++ b/fractal-gtk/src/appop/message.rs
@@ -655,9 +655,6 @@ async fn send_msg_and_manage(session_client: MatrixClient, msg: Message) {
     match room::send_msg(session_client, msg).await {
         Ok(evid) => {
             APPOP!(msg_sent, (evid));
-            let initial = false;
-            let number_tries = 0;
-            APPOP!(sync, (initial, number_tries));
         }
         Err(err) => {
             err.handle_error();
diff --git a/fractal-gtk/src/appop/mod.rs b/fractal-gtk/src/appop/mod.rs
index f85db92e..4a5d51d0 100644
--- a/fractal-gtk/src/appop/mod.rs
+++ b/fractal-gtk/src/appop/mod.rs
@@ -79,7 +79,6 @@ pub struct AppOp {
     pub app_runtime: AppRuntime,
     pub ui: ui::UI,
 
-    pub syncing: bool, // TODO: Replace with a Mutex
     pub msg_queue: Vec<TmpMsg>,
     pub sending_message: bool,
 
@@ -114,7 +113,6 @@ impl AppOp {
             join_to_room: Arc::new(Mutex::new(None)),
             rooms: HashMap::new(),
             login_data: None,
-            syncing: false,
             msg_queue: vec![],
             sending_message: false,
             state: AppState::Login,
diff --git a/fractal-gtk/src/appop/sync.rs b/fractal-gtk/src/appop/sync.rs
index a29ef4fd..34bd3448 100644
--- a/fractal-gtk/src/appop/sync.rs
+++ b/fractal-gtk/src/appop/sync.rs
@@ -1,138 +1,200 @@
 use crate::app::RUNTIME;
 use crate::appop::AppOp;
-use crate::backend::{
-    sync::{self, RoomElement, SyncRet, SyncUpdates},
-    HandleError,
-};
+use crate::globals;
 use crate::model::{
     member::Member,
     room::{Room, RoomMembership, RoomTag},
 };
 use crate::util::i18n::i18n;
 use log::warn;
+use matrix_sdk::api::r0::filter::Filter as EventFilter;
+use matrix_sdk::api::r0::filter::FilterDefinition;
+use matrix_sdk::api::r0::filter::LazyLoadOptions;
+use matrix_sdk::api::r0::filter::RoomEventFilter;
+use matrix_sdk::api::r0::filter::RoomFilter;
+use matrix_sdk::api::r0::sync::sync_events::Filter;
 use matrix_sdk::api::r0::sync::sync_events::JoinedRoom;
 use matrix_sdk::api::r0::sync::sync_events::Response as SyncResponse;
+use matrix_sdk::api::r0::sync::sync_events::UnreadNotificationsCount;
+use matrix_sdk::assign;
+use matrix_sdk::events::room::member::MemberEventContent;
 use matrix_sdk::events::AnyEphemeralRoomEventContent;
 use matrix_sdk::events::AnySyncMessageEvent;
 use matrix_sdk::events::AnySyncRoomEvent;
 use matrix_sdk::events::AnySyncStateEvent;
-use matrix_sdk::identifiers::{RoomId, UserId};
-use std::collections::BTreeMap;
+use matrix_sdk::events::StateEvent;
+use matrix_sdk::identifiers::{EventId, RoomId, UserId};
+use matrix_sdk::Client as MatrixClient;
+use matrix_sdk::LoopCtrl;
+use matrix_sdk::SyncSettings;
+use std::{
+    collections::{BTreeMap, HashMap},
+    sync::{
+        atomic::{AtomicBool, Ordering},
+        Arc, Mutex,
+    },
+};
+
+enum RoomElement {
+    Name(RoomId, String),
+    Topic(RoomId, String),
+    NewAvatar(RoomId),
+    MemberEvent(StateEvent<MemberEventContent>),
+    RemoveMessage(RoomId, EventId),
+}
+
+struct SyncRet {
+    // Only new rooms if it's an initial sync
+    pub rooms: Vec<Room>,
+    pub next_batch: String,
+    // None if it's an initial sync
+    pub updates: Option<SyncUpdates>,
+}
+
+struct SyncUpdates {
+    pub room_notifications: HashMap<RoomId, UnreadNotificationsCount>,
+    // TODO: Typing events should become RoomElements
+    pub typing_events_as_rooms: Vec<Room>,
+    pub new_events: Vec<RoomElement>,
+}
 
 impl AppOp {
-    pub fn initial_sync(&self, show: bool) {
-        if show {
-            self.ui
-                .inapp_notify(&i18n("Syncing, this could take a while"));
-        } else {
-            self.ui.hide_inapp_notify();
-        }
+    pub fn show_initial_sync(&self) {
+        self.ui.inapp_notify(&i18n("Syncing, this could take a while"));
     }
 
-    pub fn sync(&mut self, initial: bool, number_tries: u32) {
-        if let (Some((session_client, user_id)), false) = (
-            self.login_data
-                .as_ref()
-                .map(|ld| (ld.session_client.clone(), ld.uid.clone())),
-            self.syncing,
-        ) {
-            self.syncing = true;
-            // for the initial sync we set the since to None to avoid long syncing
-            // the since can be a very old value and following the spec we should
-            // do the initial sync without a since:
-            // https://matrix.org/docs/spec/client_server/latest.html#syncing
-            let join_to_room = self.join_to_room.clone();
-            let since = self.since.clone().filter(|_| !initial);
-            RUNTIME.spawn(async move {
-                let query = sync::sync(session_client, since, number_tries).await;
-
-                match query {
-                    Ok(response) => {
-                        let sync_ret = transform_sync_response(response, initial, user_id);
-                        let clear_room_list = sync_ret.updates.is_none();
-                        if let Some(updates) = sync_ret.updates {
-                            let rooms = sync_ret.rooms;
-                            let msgs: Vec<_> =
-                                rooms.iter().flat_map(|r| &r.messages).cloned().collect();
-                            APPOP!(set_rooms, (rooms, clear_room_list));
-                            APPOP!(show_room_messages, (msgs));
-                            let typing_events_as_rooms = updates.typing_events_as_rooms;
-                            APPOP!(set_rooms, (typing_events_as_rooms, clear_room_list));
-
-                            for (room_id, unread_notifications) in updates.room_notifications {
-                                let r = room_id;
-                                let n: u64 = unread_notifications
-                                    .notification_count
-                                    .map(Into::into)
-                                    .unwrap_or_default();
-                                let h: u64 = unread_notifications
-                                    .highlight_count
-                                    .map(Into::into)
-                                    .unwrap_or_default();
-                                APPOP!(set_room_notifications, (r, n, h));
-                            }
-
-                            for room_element in updates.new_events {
-                                match room_element {
-                                    RoomElement::Name(room_id, name) => {
-                                        let n = Some(name);
-                                        APPOP!(room_name_change, (room_id, n));
-                                    }
-                                    RoomElement::Topic(room_id, topic) => {
-                                        let t = Some(topic);
-                                        APPOP!(room_topic_change, (room_id, t));
-                                    }
-                                    RoomElement::NewAvatar(room_id) => {
-                                        APPOP!(new_room_avatar, (room_id));
-                                    }
-                                    RoomElement::MemberEvent(event) => {
-                                        APPOP!(room_member_event, (event));
-                                    }
-                                    RoomElement::RemoveMessage(room_id, msg_id) => {
-                                        APPOP!(remove_message, (room_id, msg_id));
-                                    }
-                                }
-                            }
-                        } else {
-                            let rooms = sync_ret.rooms;
-                            let jtr = join_to_room.lock().unwrap().as_ref().and_then(|jtr| {
-                                rooms
-                                    .iter()
-                                    .map(|room| &room.id)
-                                    .find(|rid| *rid == jtr)
-                                    .cloned()
-                            });
-                            APPOP!(set_rooms, (rooms, clear_room_list));
-                            // Open the newly joined room
-                            let jtr_ = jtr.clone();
-                            APPOP!(set_join_to_room, (jtr_));
-                            if let Some(room_id) = jtr {
-                                APPOP!(set_active_room_by_id, (room_id));
-                            }
-                        }
+    pub fn hide_initial_sync(&self) {
+        self.ui.hide_inapp_notify();
+    }
+
+    pub fn setup_sync(&mut self) {
+        let (session_client, user_id) = unwrap_or_unit_return!(self
+            .login_data
+            .as_ref()
+            .map(|ld| (ld.session_client.clone(), ld.uid.clone())));
+
+        RUNTIME.spawn(create_and_launch_sync_task(
+            session_client,
+            user_id,
+            self.join_to_room.clone(),
+        ));
+    }
+
+    pub fn synced(&mut self, since: Option<String>) {
+        self.since = since;
+        self.hide_initial_sync();
+    }
+}
+
+async fn create_and_launch_sync_task(
+    session_client: MatrixClient,
+    user_id: UserId,
+    join_to_room: Arc<Mutex<Option<RoomId>>>,
+) {
+    let timeline_not_types = [String::from("m.call.*")];
+    let timeline_types = [String::from("m.room.message"), String::from("m.sticker")];
+    let state_types = [String::from("m.room.*")];
+    // Don't filter event fields, it breaks deserialization.
+    // Clearly the Matrix API is very static-typing-unfriendly right now.
+    let filter = assign!(FilterDefinition::empty(), {
+        presence: assign!(EventFilter::empty(), {
+            types: Some(&[]),
+        }),
+        room: assign!(RoomFilter::empty(), {
+            timeline: assign!(RoomEventFilter::empty(), {
+                not_types: &timeline_not_types,
+                limit: Some(globals::PAGE_LIMIT.into()),
+                types: Some(&timeline_types),
+            }),
+            ephemeral: assign!(RoomEventFilter::empty(), {
+                types: Some(&[]),
+            }),
+            state: assign!(RoomEventFilter::empty(), {
+                types: Some(&state_types),
+                lazy_load_options: LazyLoadOptions::Enabled {
+                    include_redundant_members: false,
+                },
+            }),
+        }),
+    });
+
+    let settings = SyncSettings::new().filter(Filter::FilterDefinition(filter));
+
+    let initial = AtomicBool::from(true);
+    let sync_callback = move |response: SyncResponse| {
+        let sync_ret =
+            transform_sync_response(response, initial.load(Ordering::Relaxed), user_id.clone());
+        initial.store(false, Ordering::Relaxed);
+        let clear_room_list = sync_ret.updates.is_none();
+        if let Some(updates) = sync_ret.updates {
+            let rooms = sync_ret.rooms;
+            let msgs: Vec<_> = rooms.iter().flat_map(|r| &r.messages).cloned().collect();
+            APPOP!(set_rooms, (rooms, clear_room_list));
+            APPOP!(show_room_messages, (msgs));
+            let typing_events_as_rooms = updates.typing_events_as_rooms;
+            APPOP!(set_rooms, (typing_events_as_rooms, clear_room_list));
 
-                        let s = Some(sync_ret.next_batch);
-                        APPOP!(synced, (s));
+            for (room_id, unread_notifications) in updates.room_notifications {
+                let r = room_id;
+                let n: u64 = unread_notifications
+                    .notification_count
+                    .map(Into::into)
+                    .unwrap_or_default();
+                let h: u64 = unread_notifications
+                    .highlight_count
+                    .map(Into::into)
+                    .unwrap_or_default();
+                APPOP!(set_room_notifications, (r, n, h));
+            }
+
+            for room_element in updates.new_events {
+                match room_element {
+                    RoomElement::Name(room_id, name) => {
+                        let n = Some(name);
+                        APPOP!(room_name_change, (room_id, n));
+                    }
+                    RoomElement::Topic(room_id, topic) => {
+                        let t = Some(topic);
+                        APPOP!(room_topic_change, (room_id, t));
+                    }
+                    RoomElement::NewAvatar(room_id) => {
+                        APPOP!(new_room_avatar, (room_id));
+                    }
+                    RoomElement::MemberEvent(event) => {
+                        APPOP!(room_member_event, (event));
                     }
-                    Err(err) => {
-                        err.handle_error();
+                    RoomElement::RemoveMessage(room_id, msg_id) => {
+                        APPOP!(remove_message, (room_id, msg_id));
                     }
                 }
+            }
+        } else {
+            let rooms = sync_ret.rooms;
+            let jtr = join_to_room.lock().unwrap().as_ref().and_then(|jtr| {
+                rooms
+                    .iter()
+                    .map(|room| &room.id)
+                    .find(|rid| *rid == jtr)
+                    .cloned()
             });
+            APPOP!(set_rooms, (rooms, clear_room_list));
+            // Open the newly joined room
+            let jtr_ = jtr.clone();
+            APPOP!(set_join_to_room, (jtr_));
+            if let Some(room_id) = jtr {
+                APPOP!(set_active_room_by_id, (room_id));
+            }
         }
-    }
 
-    pub fn synced(&mut self, since: Option<String>) {
-        self.syncing = false;
-        self.since = since;
-        self.sync(false, 0);
-        self.initial_sync(false);
-    }
+        let s = Some(sync_ret.next_batch);
+        APPOP!(synced, (s));
 
-    pub fn sync_error(&mut self, number_tries: u32) {
-        self.syncing = false;
-        self.sync(false, number_tries);
-    }
+        async { LoopCtrl::Continue }
+    };
+
+    session_client
+        .sync_with_callback(settings, sync_callback)
+        .await;
 }
 
 fn transform_sync_response(response: SyncResponse, initial: bool, user_id: UserId) -> SyncRet {
diff --git a/fractal-gtk/src/backend/mod.rs b/fractal-gtk/src/backend/mod.rs
index a13bf3ab..17ac8aaa 100644
--- a/fractal-gtk/src/backend/mod.rs
+++ b/fractal-gtk/src/backend/mod.rs
@@ -25,7 +25,6 @@ pub mod directory;
 pub mod media;
 pub mod register;
 pub mod room;
-pub mod sync;
 pub mod user;
 
 lazy_static! {
diff --git a/fractal-gtk/src/meson.build b/fractal-gtk/src/meson.build
index 36cc8fc8..9ab346c1 100644
--- a/fractal-gtk/src/meson.build
+++ b/fractal-gtk/src/meson.build
@@ -75,7 +75,6 @@ app_sources = files(
   'backend/mod.rs',
   'backend/register.rs',
   'backend/room.rs',
-  'backend/sync.rs',
   'backend/user.rs',
   'cache/mod.rs',
   'cache/state.rs',


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]