matrix_sdk_ui/room_list_service/
state.rs1use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30 Init,
32
33 SettingUp,
35
36 Recovering,
40
41 Running,
43
44 Error { from: Box<State> },
46
47 Terminated { from: Box<State> },
49}
50
51const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
53
54#[derive(Debug)]
56pub struct StateMachine {
57 state: SharedObservable<State>,
59
60 last_state_update_time: Mutex<Instant>,
74
75 state_lifespan: Duration,
79}
80
81impl StateMachine {
82 pub(super) fn new() -> Self {
83 StateMachine {
84 state: SharedObservable::new(State::Init),
85 last_state_update_time: Mutex::new(Instant::now()),
86 state_lifespan: DEFAULT_STATE_LIFESPAN,
87 }
88 }
89
90 pub(super) fn get(&self) -> State {
92 self.state.get()
93 }
94
95 pub(super) fn set(&self, state: State) {
99 let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
100 *last_state_update_time = Instant::now();
101
102 self.state.set(state);
103 }
104
105 pub fn subscribe(&self) -> Subscriber<State> {
107 self.state.subscribe()
108 }
109
110 pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
113 use State::*;
114
115 let next_state = match self.get() {
116 Init => SettingUp,
117
118 SettingUp | Recovering => {
119 set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
120 Running
121 }
122
123 Running => {
124 if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
128 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
129
130 Recovering
131 } else {
132 Running
133 }
134 }
135
136 Error { from: previous_state } | Terminated { from: previous_state } => {
137 match previous_state.as_ref() {
138 Error { .. } | Terminated { .. } => {
140 unreachable!(
141 "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
142 );
143 }
144
145 Running => {
147 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
148 Recovering
149 }
150
151 state => state.to_owned(),
153 }
154 }
155 };
156
157 Ok(next_state)
158 }
159}
160
161async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
162 sliding_sync
163 .on_list(ALL_ROOMS_LIST_NAME, |list| {
164 list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
165
166 ready(())
167 })
168 .await
169 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
170}
171
172async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
173 sliding_sync
174 .on_list(ALL_ROOMS_LIST_NAME, |list| {
175 list.set_sync_mode(
176 SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
177 );
178
179 ready(())
180 })
181 .await
182 .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
183}
184
185pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
188
189pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
192
193#[cfg(test)]
194mod tests {
195 use matrix_sdk_test::async_test;
196 use tokio::time::sleep;
197
198 use super::{super::tests::new_room_list, *};
199
200 #[async_test]
201 async fn test_states() -> Result<(), Error> {
202 let room_list = new_room_list().await?;
203 let sliding_sync = room_list.sliding_sync();
204
205 let state_machine = StateMachine::new();
206
207 {
209 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
210
211 state_machine.set(state_machine.next(sliding_sync).await?);
213 assert_eq!(state_machine.get(), State::Init);
214 }
215
216 {
218 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
219
220 state_machine.set(state_machine.next(sliding_sync).await?);
222 assert_eq!(state_machine.get(), State::Init);
223 }
224
225 state_machine.set(state_machine.next(sliding_sync).await?);
227 assert_eq!(state_machine.get(), State::SettingUp);
228
229 {
231 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
232
233 state_machine.set(state_machine.next(sliding_sync).await?);
235 assert_eq!(state_machine.get(), State::SettingUp);
236 }
237
238 {
240 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
241
242 state_machine.set(state_machine.next(sliding_sync).await?);
244 assert_eq!(state_machine.get(), State::SettingUp);
245 }
246
247 state_machine.set(state_machine.next(sliding_sync).await?);
249 assert_eq!(state_machine.get(), State::Running);
250
251 {
253 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
254
255 state_machine.set(state_machine.next(sliding_sync).await?);
257 assert_eq!(state_machine.get(), State::Recovering);
258
259 state_machine.set(state_machine.next(sliding_sync).await?);
261 assert_eq!(state_machine.get(), State::Running);
262 }
263
264 {
266 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
267
268 state_machine.set(state_machine.next(sliding_sync).await?);
270 assert_eq!(state_machine.get(), State::Recovering);
271
272 state_machine.set(state_machine.next(sliding_sync).await?);
274 assert_eq!(state_machine.get(), State::Running);
275 }
276
277 {
279 state_machine.set(State::Error { from: Box::new(State::Recovering) });
280
281 state_machine.set(state_machine.next(sliding_sync).await?);
283 assert_eq!(state_machine.get(), State::Recovering);
284 }
285
286 {
288 state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
289
290 state_machine.set(state_machine.next(sliding_sync).await?);
292 assert_eq!(state_machine.get(), State::Recovering);
293 }
294
295 Ok(())
296 }
297
298 #[async_test]
299 async fn test_recover_state_after_delay() -> Result<(), Error> {
300 let room_list = new_room_list().await?;
301 let sliding_sync = room_list.sliding_sync();
302
303 let mut state_machine = StateMachine::new();
304 state_machine.state_lifespan = Duration::from_millis(50);
305
306 {
307 state_machine.set(state_machine.next(sliding_sync).await?);
308 assert_eq!(state_machine.get(), State::SettingUp);
309
310 state_machine.set(state_machine.next(sliding_sync).await?);
311 assert_eq!(state_machine.get(), State::Running);
312
313 state_machine.set(state_machine.next(sliding_sync).await?);
314 assert_eq!(state_machine.get(), State::Running);
315
316 state_machine.set(state_machine.next(sliding_sync).await?);
317 assert_eq!(state_machine.get(), State::Running);
318 }
319
320 sleep(Duration::from_millis(100)).await;
322
323 {
324 state_machine.set(state_machine.next(sliding_sync).await?);
326 assert_eq!(state_machine.get(), State::Recovering);
327
328 state_machine.set(state_machine.next(sliding_sync).await?);
329 assert_eq!(state_machine.get(), State::Running);
330
331 state_machine.set(state_machine.next(sliding_sync).await?);
332 assert_eq!(state_machine.get(), State::Running);
333
334 state_machine.set(state_machine.next(sliding_sync).await?);
335 assert_eq!(state_machine.get(), State::Running);
336 }
337
338 sleep(Duration::from_millis(100)).await;
340
341 {
342 state_machine.set(state_machine.next(sliding_sync).await?);
344 assert_eq!(state_machine.get(), State::Recovering);
345
346 state_machine.set(state_machine.next(sliding_sync).await?);
347 assert_eq!(state_machine.get(), State::Running);
348
349 state_machine.set(state_machine.next(sliding_sync).await?);
350 assert_eq!(state_machine.get(), State::Running);
351
352 state_machine.set(state_machine.next(sliding_sync).await?);
353 assert_eq!(state_machine.get(), State::Running);
354 }
355
356 Ok(())
357 }
358
359 #[async_test]
360 async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
361 {
362 let room_list = new_room_list().await?;
363 let sliding_sync = room_list.sliding_sync();
364
365 assert_eq!(
367 sliding_sync
368 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
369 list.sync_mode(),
370 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
371 )))
372 .await,
373 Some(true)
374 );
375
376 set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
378
379 assert_eq!(
381 sliding_sync
382 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
383 list.sync_mode(),
384 SlidingSyncMode::Growing {
385 batch_size, ..
386 } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
387 )))
388 .await,
389 Some(true)
390 );
391
392 set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
394
395 assert_eq!(
397 sliding_sync
398 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
399 list.sync_mode(),
400 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
401 )))
402 .await,
403 Some(true)
404 );
405
406 Ok(())
407 }
408}