[fractal/fractal-next] timeline: Clear timeline on broken forward_stream



commit e6259b5c50d8221c25d8a7d3cdc09cb33239cfaf
Author: Julian Sparber <julian sparber net>
Date:   Thu Mar 24 14:40:37 2022 +0100

    timeline: Clear timeline on broken forward_stream

 src/session/room/timeline.rs | 38 ++++++++++++++++++++++----------------
 1 file changed, 22 insertions(+), 16 deletions(-)
---
diff --git a/src/session/room/timeline.rs b/src/session/room/timeline.rs
index 282f532ae..0f4f322c1 100644
--- a/src/session/room/timeline.rs
+++ b/src/session/room/timeline.rs
@@ -9,13 +9,13 @@ use gtk::{gio, glib, prelude::*, subclass::prelude::*};
 use log::{error, warn};
 use matrix_sdk::{
     deserialized_responses::SyncRoomEvent,
-    room::Room as MatrixRoom,
     ruma::{
         events::{room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent},
         identifiers::{EventId, TransactionId},
     },
     Error as MatrixError,
 };
+use tokio::task::JoinHandle;
 
 use crate::{
     session::{
@@ -73,6 +73,7 @@ mod imp {
         /// The most recent verification request event
         pub verification: RefCell<Option<IdentityVerification>>,
         pub backward_stream: Arc<Mutex<Option<BackwardStream>>>,
+        pub forward_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
     }
 
     #[glib::object_subclass]
@@ -531,14 +532,26 @@ impl Timeline {
         self.add_loading_spinner();
 
         let matrix_room = self.room().matrix_room();
-        let room_weak = self.downgrade().into();
+        let timeline_weak = self.downgrade().into();
         let backward_stream = priv_.backward_stream.clone();
+        let forward_handle = priv_.forward_handle.clone();
 
         let handle: tokio::task::JoinHandle<matrix_sdk::Result<_>> = spawn_tokio!(async move {
             let mut backward_stream_guard = backward_stream.lock().await;
+            let mut forward_handle_guard = forward_handle.lock().await;
             if backward_stream_guard.is_none() {
+                let (forward_stream, backward_stream) = matrix_room.timeline().await?;
+
+                let forward_handle = tokio::spawn(async move {
+                    handle_forward_stream(timeline_weak, forward_stream).await;
+                });
+
+                if let Some(old_forward_handle) = forward_handle_guard.replace(forward_handle) {
+                    old_forward_handle.abort();
+                }
+
                 backward_stream_guard
-                    .replace(create_streams_handler(room_weak, matrix_room).await?);
+                    .replace(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)));
             }
 
             Ok(backward_stream_guard.as_mut().unwrap().next().await)
@@ -587,12 +600,18 @@ impl Timeline {
         let mut backward_stream = priv_.backward_stream.lock().await;
         backward_stream.take();
 
+        let mut forward_handle = priv_.forward_handle.lock().await;
+        if let Some(forward_handle) = forward_handle.take() {
+            forward_handle.abort();
+        }
+
         let length = priv_.list.borrow().len();
         priv_.relates_to_events.replace(HashMap::new());
         priv_.list.replace(VecDeque::new());
         priv_.event_map.replace(HashMap::new());
         priv_.pending_events.replace(HashMap::new());
         priv_.redacted_events.replace(HashSet::new());
+        self.set_state(TimelineState::Initial);
 
         self.notify("empty");
         self.upcast_ref::<gio::ListModel>()
@@ -912,19 +931,6 @@ impl Timeline {
     }
 }
 
-async fn create_streams_handler(
-    timeline: glib::SendWeakRef<Timeline>,
-    matrix_room: MatrixRoom,
-) -> matrix_sdk::Result<BackwardStream> {
-    let (forward_stream, backward_stream) = matrix_room.timeline().await?;
-
-    tokio::spawn(async move {
-        handle_forward_stream(timeline, forward_stream).await;
-    });
-
-    Ok(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)))
-}
-
 async fn handle_forward_stream(
     timeline: glib::SendWeakRef<Timeline>,
     stream: impl Stream<Item = SyncRoomEvent>,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]