matrix_sdk_ui/room_list_service/
state.rs1use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{SlidingSync, SlidingSyncMode, sliding_sync::Range};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30 Init,
32
33 SettingUp,
35
36 Recovering,
42
43 Running,
45
46 Error { from: Box<State> },
48
49 Terminated { from: Box<State> },
51}
52
53const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56#[derive(Debug)]
58pub struct StateMachine {
59 state: SharedObservable<State>,
61
62 last_state_update_time: Mutex<Instant>,
76
77 state_lifespan: Duration,
81}
82
83impl StateMachine {
84 pub(super) fn new() -> Self {
85 StateMachine {
86 state: SharedObservable::new(State::Init),
87 last_state_update_time: Mutex::new(Instant::now()),
88 state_lifespan: DEFAULT_STATE_LIFESPAN,
89 }
90 }
91
92 pub(super) fn get(&self) -> State {
94 self.state.get()
95 }
96
97 pub(super) fn cloned_state(&self) -> SharedObservable<State> {
99 self.state.clone()
100 }
101
102 pub(super) fn set(&self, state: State) {
106 let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
107 *last_state_update_time = Instant::now();
108
109 self.state.set(state);
110 }
111
112 pub fn subscribe(&self) -> Subscriber<State> {
114 self.state.subscribe()
115 }
116
117 pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
120 use State::*;
121
122 let next_state = match self.get() {
123 Init => SettingUp,
124
125 SettingUp | Recovering => {
126 set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
127 Running
128 }
129
130 Running => {
131 if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
135 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
136
137 Recovering
138 } else {
139 Running
140 }
141 }
142
143 Error { from: previous_state } | Terminated { from: previous_state } => {
144 match previous_state.as_ref() {
145 Error { .. } | Terminated { .. } => {
147 unreachable!(
148 "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
149 );
150 }
151
152 Running => {
154 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
155 Recovering
156 }
157
158 state => state.to_owned(),
160 }
161 }
162 };
163
164 Ok(next_state)
165 }
166}
167
168async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
169 sliding_sync
170 .on_list(ALL_ROOMS_LIST_NAME, |list| {
171 list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
172
173 ready(())
174 })
175 .await
176 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
177}
178
179async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
180 sliding_sync
181 .on_list(ALL_ROOMS_LIST_NAME, |list| {
182 list.set_sync_mode(
183 SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
184 );
185
186 ready(())
187 })
188 .await
189 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
190}
191
192pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
195
196pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
199
200#[cfg(test)]
201mod tests {
202 use matrix_sdk_test::async_test;
203 use tokio::time::sleep;
204
205 use super::{super::tests::new_room_list, *};
206
207 #[async_test]
208 async fn test_states() -> Result<(), Error> {
209 let room_list = new_room_list().await?;
210 let sliding_sync = room_list.sliding_sync();
211
212 let state_machine = StateMachine::new();
213
214 {
216 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
217
218 state_machine.set(state_machine.next(sliding_sync).await?);
220 assert_eq!(state_machine.get(), State::Init);
221 }
222
223 {
225 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
226
227 state_machine.set(state_machine.next(sliding_sync).await?);
229 assert_eq!(state_machine.get(), State::Init);
230 }
231
232 state_machine.set(state_machine.next(sliding_sync).await?);
234 assert_eq!(state_machine.get(), State::SettingUp);
235
236 {
238 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
239
240 state_machine.set(state_machine.next(sliding_sync).await?);
242 assert_eq!(state_machine.get(), State::SettingUp);
243 }
244
245 {
247 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
248
249 state_machine.set(state_machine.next(sliding_sync).await?);
251 assert_eq!(state_machine.get(), State::SettingUp);
252 }
253
254 state_machine.set(state_machine.next(sliding_sync).await?);
256 assert_eq!(state_machine.get(), State::Running);
257
258 {
260 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
261
262 state_machine.set(state_machine.next(sliding_sync).await?);
264 assert_eq!(state_machine.get(), State::Recovering);
265
266 state_machine.set(state_machine.next(sliding_sync).await?);
268 assert_eq!(state_machine.get(), State::Running);
269 }
270
271 {
273 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
274
275 state_machine.set(state_machine.next(sliding_sync).await?);
277 assert_eq!(state_machine.get(), State::Recovering);
278
279 state_machine.set(state_machine.next(sliding_sync).await?);
281 assert_eq!(state_machine.get(), State::Running);
282 }
283
284 {
286 state_machine.set(State::Error { from: Box::new(State::Recovering) });
287
288 state_machine.set(state_machine.next(sliding_sync).await?);
290 assert_eq!(state_machine.get(), State::Recovering);
291 }
292
293 {
295 state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
296
297 state_machine.set(state_machine.next(sliding_sync).await?);
299 assert_eq!(state_machine.get(), State::Recovering);
300 }
301
302 Ok(())
303 }
304
305 #[async_test]
306 async fn test_recover_state_after_delay() -> Result<(), Error> {
307 let room_list = new_room_list().await?;
308 let sliding_sync = room_list.sliding_sync();
309
310 let mut state_machine = StateMachine::new();
311 state_machine.state_lifespan = Duration::from_millis(50);
312
313 {
314 state_machine.set(state_machine.next(sliding_sync).await?);
315 assert_eq!(state_machine.get(), State::SettingUp);
316
317 state_machine.set(state_machine.next(sliding_sync).await?);
318 assert_eq!(state_machine.get(), State::Running);
319
320 state_machine.set(state_machine.next(sliding_sync).await?);
321 assert_eq!(state_machine.get(), State::Running);
322
323 state_machine.set(state_machine.next(sliding_sync).await?);
324 assert_eq!(state_machine.get(), State::Running);
325 }
326
327 sleep(Duration::from_millis(100)).await;
329
330 {
331 state_machine.set(state_machine.next(sliding_sync).await?);
333 assert_eq!(state_machine.get(), State::Recovering);
334
335 state_machine.set(state_machine.next(sliding_sync).await?);
336 assert_eq!(state_machine.get(), State::Running);
337
338 state_machine.set(state_machine.next(sliding_sync).await?);
339 assert_eq!(state_machine.get(), State::Running);
340
341 state_machine.set(state_machine.next(sliding_sync).await?);
342 assert_eq!(state_machine.get(), State::Running);
343 }
344
345 sleep(Duration::from_millis(100)).await;
347
348 {
349 state_machine.set(state_machine.next(sliding_sync).await?);
351 assert_eq!(state_machine.get(), State::Recovering);
352
353 state_machine.set(state_machine.next(sliding_sync).await?);
354 assert_eq!(state_machine.get(), State::Running);
355
356 state_machine.set(state_machine.next(sliding_sync).await?);
357 assert_eq!(state_machine.get(), State::Running);
358
359 state_machine.set(state_machine.next(sliding_sync).await?);
360 assert_eq!(state_machine.get(), State::Running);
361 }
362
363 Ok(())
364 }
365
366 #[async_test]
367 async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
368 {
369 let room_list = new_room_list().await?;
370 let sliding_sync = room_list.sliding_sync();
371
372 assert_eq!(
374 sliding_sync
375 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
376 list.sync_mode(),
377 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
378 )))
379 .await,
380 Some(true)
381 );
382
383 set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
385
386 assert_eq!(
388 sliding_sync
389 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
390 list.sync_mode(),
391 SlidingSyncMode::Growing {
392 batch_size, ..
393 } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
394 )))
395 .await,
396 Some(true)
397 );
398
399 set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
401
402 assert_eq!(
404 sliding_sync
405 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
406 list.sync_mode(),
407 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
408 )))
409 .await,
410 Some(true)
411 );
412
413 Ok(())
414 }
415}