matrix_sdk_ui/room_list_service/
state.rs1use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{SlidingSync, SlidingSyncMode, sliding_sync::Range};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30 Init,
32
33 SettingUp,
35
36 Recovering,
42
43 Running,
45
46 Error { from: Box<State> },
48
49 Terminated { from: Box<State> },
51}
52
53const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56#[derive(Debug)]
58pub struct StateMachine {
59 state: SharedObservable<State>,
61
62 last_state_update_time: Mutex<Instant>,
76
77 state_lifespan: Duration,
81}
82
83impl StateMachine {
84 pub(super) fn new() -> Self {
85 StateMachine {
86 state: SharedObservable::new(State::Init),
87 last_state_update_time: Mutex::new(Instant::now()),
88 state_lifespan: DEFAULT_STATE_LIFESPAN,
89 }
90 }
91
92 pub(super) fn get(&self) -> State {
94 self.state.get()
95 }
96
97 pub(super) fn cloned_state(&self) -> SharedObservable<State> {
99 self.state.clone()
100 }
101
102 pub(super) fn set(&self, state: State) {
106 let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
107 *last_state_update_time = Instant::now();
108
109 self.state.set(state);
110 }
111
112 pub fn subscribe(&self) -> Subscriber<State> {
114 self.state.subscribe()
115 }
116
117 pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
120 use State::*;
121
122 let next_state = match self.get() {
123 Init => SettingUp,
124
125 SettingUp | Recovering => {
126 set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
127 Running
128 }
129
130 Running => {
131 if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
135 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
136
137 Recovering
138 } else {
139 Running
140 }
141 }
142
143 Error { from: previous_state } | Terminated { from: previous_state } => {
144 match previous_state.as_ref() {
145 Error { .. } | Terminated { .. } => {
147 unreachable!(
148 "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
149 );
150 }
151
152 Running => {
154 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
155 Recovering
156 }
157
158 state => state.to_owned(),
160 }
161 }
162 };
163
164 Ok(next_state)
165 }
166}
167
168async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
169 sliding_sync
170 .on_list(ALL_ROOMS_LIST_NAME, |list| {
171 list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
172
173 ready(())
174 })
175 .await
176 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
177}
178
179async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
180 sliding_sync
181 .on_list(ALL_ROOMS_LIST_NAME, |list| {
182 list.set_sync_mode(
183 SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
184 );
185
186 ready(())
187 })
188 .await
189 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
190}
191
192pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
195
196pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
199
200#[cfg(test)]
201mod tests {
202 use matrix_sdk::test_utils::client::MockClientBuilder;
203 use matrix_sdk_test::async_test;
204 use tokio::time::sleep;
205
206 use super::*;
207 use crate::RoomListService;
208
209 async fn new_room_list() -> Result<RoomListService, Error> {
210 let client = MockClientBuilder::new(None).build().await;
211 RoomListService::new(client).await
212 }
213
214 #[async_test]
215 async fn test_states() -> Result<(), Error> {
216 let room_list = new_room_list().await?;
217 let sliding_sync = room_list.sliding_sync();
218
219 let state_machine = StateMachine::new();
220
221 {
223 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
224
225 state_machine.set(state_machine.next(sliding_sync).await?);
227 assert_eq!(state_machine.get(), State::Init);
228 }
229
230 {
232 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
233
234 state_machine.set(state_machine.next(sliding_sync).await?);
236 assert_eq!(state_machine.get(), State::Init);
237 }
238
239 state_machine.set(state_machine.next(sliding_sync).await?);
241 assert_eq!(state_machine.get(), State::SettingUp);
242
243 {
245 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
246
247 state_machine.set(state_machine.next(sliding_sync).await?);
249 assert_eq!(state_machine.get(), State::SettingUp);
250 }
251
252 {
254 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
255
256 state_machine.set(state_machine.next(sliding_sync).await?);
258 assert_eq!(state_machine.get(), State::SettingUp);
259 }
260
261 state_machine.set(state_machine.next(sliding_sync).await?);
263 assert_eq!(state_machine.get(), State::Running);
264
265 {
267 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
268
269 state_machine.set(state_machine.next(sliding_sync).await?);
271 assert_eq!(state_machine.get(), State::Recovering);
272
273 state_machine.set(state_machine.next(sliding_sync).await?);
275 assert_eq!(state_machine.get(), State::Running);
276 }
277
278 {
280 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
281
282 state_machine.set(state_machine.next(sliding_sync).await?);
284 assert_eq!(state_machine.get(), State::Recovering);
285
286 state_machine.set(state_machine.next(sliding_sync).await?);
288 assert_eq!(state_machine.get(), State::Running);
289 }
290
291 {
293 state_machine.set(State::Error { from: Box::new(State::Recovering) });
294
295 state_machine.set(state_machine.next(sliding_sync).await?);
297 assert_eq!(state_machine.get(), State::Recovering);
298 }
299
300 {
302 state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
303
304 state_machine.set(state_machine.next(sliding_sync).await?);
306 assert_eq!(state_machine.get(), State::Recovering);
307 }
308
309 Ok(())
310 }
311
312 #[async_test]
313 async fn test_recover_state_after_delay() -> Result<(), Error> {
314 let room_list = new_room_list().await?;
315 let sliding_sync = room_list.sliding_sync();
316
317 let mut state_machine = StateMachine::new();
318 state_machine.state_lifespan = Duration::from_millis(50);
319
320 {
321 state_machine.set(state_machine.next(sliding_sync).await?);
322 assert_eq!(state_machine.get(), State::SettingUp);
323
324 state_machine.set(state_machine.next(sliding_sync).await?);
325 assert_eq!(state_machine.get(), State::Running);
326
327 state_machine.set(state_machine.next(sliding_sync).await?);
328 assert_eq!(state_machine.get(), State::Running);
329
330 state_machine.set(state_machine.next(sliding_sync).await?);
331 assert_eq!(state_machine.get(), State::Running);
332 }
333
334 sleep(Duration::from_millis(100)).await;
336
337 {
338 state_machine.set(state_machine.next(sliding_sync).await?);
340 assert_eq!(state_machine.get(), State::Recovering);
341
342 state_machine.set(state_machine.next(sliding_sync).await?);
343 assert_eq!(state_machine.get(), State::Running);
344
345 state_machine.set(state_machine.next(sliding_sync).await?);
346 assert_eq!(state_machine.get(), State::Running);
347
348 state_machine.set(state_machine.next(sliding_sync).await?);
349 assert_eq!(state_machine.get(), State::Running);
350 }
351
352 sleep(Duration::from_millis(100)).await;
354
355 {
356 state_machine.set(state_machine.next(sliding_sync).await?);
358 assert_eq!(state_machine.get(), State::Recovering);
359
360 state_machine.set(state_machine.next(sliding_sync).await?);
361 assert_eq!(state_machine.get(), State::Running);
362
363 state_machine.set(state_machine.next(sliding_sync).await?);
364 assert_eq!(state_machine.get(), State::Running);
365
366 state_machine.set(state_machine.next(sliding_sync).await?);
367 assert_eq!(state_machine.get(), State::Running);
368 }
369
370 Ok(())
371 }
372
373 #[async_test]
374 async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
375 {
376 let room_list = new_room_list().await?;
377 let sliding_sync = room_list.sliding_sync();
378
379 assert_eq!(
381 sliding_sync
382 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
383 list.sync_mode(),
384 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
385 )))
386 .await,
387 Some(true)
388 );
389
390 set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
392
393 assert_eq!(
395 sliding_sync
396 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
397 list.sync_mode(),
398 SlidingSyncMode::Growing {
399 batch_size, ..
400 } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
401 )))
402 .await,
403 Some(true)
404 );
405
406 set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
408
409 assert_eq!(
411 sliding_sync
412 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
413 list.sync_mode(),
414 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
415 )))
416 .await,
417 Some(true)
418 );
419
420 Ok(())
421 }
422}