matrix_sdk_ui/room_list_service/
state.rs1use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30 Init,
32
33 SettingUp,
35
36 Recovering,
42
43 Running,
45
46 Error { from: Box<State> },
48
49 Terminated { from: Box<State> },
51}
52
53const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56#[derive(Debug)]
58pub struct StateMachine {
59 state: SharedObservable<State>,
61
62 last_state_update_time: Mutex<Instant>,
76
77 state_lifespan: Duration,
81}
82
83impl StateMachine {
84 pub(super) fn new() -> Self {
85 StateMachine {
86 state: SharedObservable::new(State::Init),
87 last_state_update_time: Mutex::new(Instant::now()),
88 state_lifespan: DEFAULT_STATE_LIFESPAN,
89 }
90 }
91
92 pub(super) fn get(&self) -> State {
94 self.state.get()
95 }
96
97 pub(super) fn set(&self, state: State) {
101 let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
102 *last_state_update_time = Instant::now();
103
104 self.state.set(state);
105 }
106
107 pub fn subscribe(&self) -> Subscriber<State> {
109 self.state.subscribe()
110 }
111
112 pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
115 use State::*;
116
117 let next_state = match self.get() {
118 Init => SettingUp,
119
120 SettingUp | Recovering => {
121 set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
122 Running
123 }
124
125 Running => {
126 if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
130 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
131
132 Recovering
133 } else {
134 Running
135 }
136 }
137
138 Error { from: previous_state } | Terminated { from: previous_state } => {
139 match previous_state.as_ref() {
140 Error { .. } | Terminated { .. } => {
142 unreachable!(
143 "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
144 );
145 }
146
147 Running => {
149 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
150 Recovering
151 }
152
153 state => state.to_owned(),
155 }
156 }
157 };
158
159 Ok(next_state)
160 }
161}
162
163async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
164 sliding_sync
165 .on_list(ALL_ROOMS_LIST_NAME, |list| {
166 list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
167
168 ready(())
169 })
170 .await
171 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
172}
173
174async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
175 sliding_sync
176 .on_list(ALL_ROOMS_LIST_NAME, |list| {
177 list.set_sync_mode(
178 SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
179 );
180
181 ready(())
182 })
183 .await
184 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
185}
186
187pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
190
191pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
194
195#[cfg(test)]
196mod tests {
197 use matrix_sdk_test::async_test;
198 use tokio::time::sleep;
199
200 use super::{super::tests::new_room_list, *};
201
202 #[async_test]
203 async fn test_states() -> Result<(), Error> {
204 let room_list = new_room_list().await?;
205 let sliding_sync = room_list.sliding_sync();
206
207 let state_machine = StateMachine::new();
208
209 {
211 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
212
213 state_machine.set(state_machine.next(sliding_sync).await?);
215 assert_eq!(state_machine.get(), State::Init);
216 }
217
218 {
220 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
221
222 state_machine.set(state_machine.next(sliding_sync).await?);
224 assert_eq!(state_machine.get(), State::Init);
225 }
226
227 state_machine.set(state_machine.next(sliding_sync).await?);
229 assert_eq!(state_machine.get(), State::SettingUp);
230
231 {
233 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
234
235 state_machine.set(state_machine.next(sliding_sync).await?);
237 assert_eq!(state_machine.get(), State::SettingUp);
238 }
239
240 {
242 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
243
244 state_machine.set(state_machine.next(sliding_sync).await?);
246 assert_eq!(state_machine.get(), State::SettingUp);
247 }
248
249 state_machine.set(state_machine.next(sliding_sync).await?);
251 assert_eq!(state_machine.get(), State::Running);
252
253 {
255 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
256
257 state_machine.set(state_machine.next(sliding_sync).await?);
259 assert_eq!(state_machine.get(), State::Recovering);
260
261 state_machine.set(state_machine.next(sliding_sync).await?);
263 assert_eq!(state_machine.get(), State::Running);
264 }
265
266 {
268 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
269
270 state_machine.set(state_machine.next(sliding_sync).await?);
272 assert_eq!(state_machine.get(), State::Recovering);
273
274 state_machine.set(state_machine.next(sliding_sync).await?);
276 assert_eq!(state_machine.get(), State::Running);
277 }
278
279 {
281 state_machine.set(State::Error { from: Box::new(State::Recovering) });
282
283 state_machine.set(state_machine.next(sliding_sync).await?);
285 assert_eq!(state_machine.get(), State::Recovering);
286 }
287
288 {
290 state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
291
292 state_machine.set(state_machine.next(sliding_sync).await?);
294 assert_eq!(state_machine.get(), State::Recovering);
295 }
296
297 Ok(())
298 }
299
300 #[async_test]
301 async fn test_recover_state_after_delay() -> Result<(), Error> {
302 let room_list = new_room_list().await?;
303 let sliding_sync = room_list.sliding_sync();
304
305 let mut state_machine = StateMachine::new();
306 state_machine.state_lifespan = Duration::from_millis(50);
307
308 {
309 state_machine.set(state_machine.next(sliding_sync).await?);
310 assert_eq!(state_machine.get(), State::SettingUp);
311
312 state_machine.set(state_machine.next(sliding_sync).await?);
313 assert_eq!(state_machine.get(), State::Running);
314
315 state_machine.set(state_machine.next(sliding_sync).await?);
316 assert_eq!(state_machine.get(), State::Running);
317
318 state_machine.set(state_machine.next(sliding_sync).await?);
319 assert_eq!(state_machine.get(), State::Running);
320 }
321
322 sleep(Duration::from_millis(100)).await;
324
325 {
326 state_machine.set(state_machine.next(sliding_sync).await?);
328 assert_eq!(state_machine.get(), State::Recovering);
329
330 state_machine.set(state_machine.next(sliding_sync).await?);
331 assert_eq!(state_machine.get(), State::Running);
332
333 state_machine.set(state_machine.next(sliding_sync).await?);
334 assert_eq!(state_machine.get(), State::Running);
335
336 state_machine.set(state_machine.next(sliding_sync).await?);
337 assert_eq!(state_machine.get(), State::Running);
338 }
339
340 sleep(Duration::from_millis(100)).await;
342
343 {
344 state_machine.set(state_machine.next(sliding_sync).await?);
346 assert_eq!(state_machine.get(), State::Recovering);
347
348 state_machine.set(state_machine.next(sliding_sync).await?);
349 assert_eq!(state_machine.get(), State::Running);
350
351 state_machine.set(state_machine.next(sliding_sync).await?);
352 assert_eq!(state_machine.get(), State::Running);
353
354 state_machine.set(state_machine.next(sliding_sync).await?);
355 assert_eq!(state_machine.get(), State::Running);
356 }
357
358 Ok(())
359 }
360
361 #[async_test]
362 async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
363 {
364 let room_list = new_room_list().await?;
365 let sliding_sync = room_list.sliding_sync();
366
367 assert_eq!(
369 sliding_sync
370 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
371 list.sync_mode(),
372 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
373 )))
374 .await,
375 Some(true)
376 );
377
378 set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
380
381 assert_eq!(
383 sliding_sync
384 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
385 list.sync_mode(),
386 SlidingSyncMode::Growing {
387 batch_size, ..
388 } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
389 )))
390 .await,
391 Some(true)
392 );
393
394 set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
396
397 assert_eq!(
399 sliding_sync
400 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
401 list.sync_mode(),
402 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
403 )))
404 .await,
405 Some(true)
406 );
407
408 Ok(())
409 }
410}