[fractal/fractal-next] timeline: Clear timeline on broken forward_stream
- From: Julian Sparber <jsparber src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [fractal/fractal-next] timeline: Clear timeline on broken forward_stream
- Date: Fri, 25 Mar 2022 15:48:31 +0000 (UTC)
commit e6259b5c50d8221c25d8a7d3cdc09cb33239cfaf
Author: Julian Sparber <julian sparber net>
Date: Thu Mar 24 14:40:37 2022 +0100
timeline: Clear timeline on broken forward_stream
src/session/room/timeline.rs | 38 ++++++++++++++++++++++----------------
1 file changed, 22 insertions(+), 16 deletions(-)
---
diff --git a/src/session/room/timeline.rs b/src/session/room/timeline.rs
index 282f532ae..0f4f322c1 100644
--- a/src/session/room/timeline.rs
+++ b/src/session/room/timeline.rs
@@ -9,13 +9,13 @@ use gtk::{gio, glib, prelude::*, subclass::prelude::*};
use log::{error, warn};
use matrix_sdk::{
deserialized_responses::SyncRoomEvent,
- room::Room as MatrixRoom,
ruma::{
events::{room::message::MessageType, AnySyncMessageEvent, AnySyncRoomEvent},
identifiers::{EventId, TransactionId},
},
Error as MatrixError,
};
+use tokio::task::JoinHandle;
use crate::{
session::{
@@ -73,6 +73,7 @@ mod imp {
/// The most recent verification request event
pub verification: RefCell<Option<IdentityVerification>>,
pub backward_stream: Arc<Mutex<Option<BackwardStream>>>,
+ pub forward_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
#[glib::object_subclass]
@@ -531,14 +532,26 @@ impl Timeline {
self.add_loading_spinner();
let matrix_room = self.room().matrix_room();
- let room_weak = self.downgrade().into();
+ let timeline_weak = self.downgrade().into();
let backward_stream = priv_.backward_stream.clone();
+ let forward_handle = priv_.forward_handle.clone();
let handle: tokio::task::JoinHandle<matrix_sdk::Result<_>> = spawn_tokio!(async move {
let mut backward_stream_guard = backward_stream.lock().await;
+ let mut forward_handle_guard = forward_handle.lock().await;
if backward_stream_guard.is_none() {
+ let (forward_stream, backward_stream) = matrix_room.timeline().await?;
+
+ let forward_handle = tokio::spawn(async move {
+ handle_forward_stream(timeline_weak, forward_stream).await;
+ });
+
+ if let Some(old_forward_handle) = forward_handle_guard.replace(forward_handle) {
+ old_forward_handle.abort();
+ }
+
backward_stream_guard
- .replace(create_streams_handler(room_weak, matrix_room).await?);
+ .replace(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)));
}
Ok(backward_stream_guard.as_mut().unwrap().next().await)
@@ -587,12 +600,18 @@ impl Timeline {
let mut backward_stream = priv_.backward_stream.lock().await;
backward_stream.take();
+ let mut forward_handle = priv_.forward_handle.lock().await;
+ if let Some(forward_handle) = forward_handle.take() {
+ forward_handle.abort();
+ }
+
let length = priv_.list.borrow().len();
priv_.relates_to_events.replace(HashMap::new());
priv_.list.replace(VecDeque::new());
priv_.event_map.replace(HashMap::new());
priv_.pending_events.replace(HashMap::new());
priv_.redacted_events.replace(HashSet::new());
+ self.set_state(TimelineState::Initial);
self.notify("empty");
self.upcast_ref::<gio::ListModel>()
@@ -912,19 +931,6 @@ impl Timeline {
}
}
-async fn create_streams_handler(
- timeline: glib::SendWeakRef<Timeline>,
- matrix_room: MatrixRoom,
-) -> matrix_sdk::Result<BackwardStream> {
- let (forward_stream, backward_stream) = matrix_room.timeline().await?;
-
- tokio::spawn(async move {
- handle_forward_stream(timeline, forward_stream).await;
- });
-
- Ok(Box::pin(backward_stream.ready_chunks(MAX_BATCH_SIZE)))
-}
-
async fn handle_forward_stream(
timeline: glib::SendWeakRef<Timeline>,
stream: impl Stream<Item = SyncRoomEvent>,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]