Skip to main content

matrix_sdk_ui/room_list_service/
state.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! States and actions for the `RoomList` state machine.
16
17use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{SlidingSync, SlidingSyncMode, sliding_sync::Range};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27/// The state of the [`super::RoomList`].
28#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30    /// That's the first initial state.
31    Init,
32
33    /// At this state, the first rooms have been synced.
34    SettingUp,
35
36    /// At this state, the system is recovering from `Error` or `Terminated`, or
37    /// the time between the last sync was too long (see
38    /// `StateMachine::state_lifespan` to learn more). It's similar to
39    /// `SettingUp` but some lists may already exist, actions
40    /// are then slightly different.
41    Recovering,
42
43    /// At this state, all rooms are syncing.
44    Running,
45
46    /// At this state, the sync has been stopped because an error happened.
47    Error { from: Box<State> },
48
49    /// At this state, the sync has been stopped because it was requested.
50    Terminated { from: Box<State> },
51}
52
53/// Default value for `StateMachine::state_lifespan`.
54const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56/// The state machine used to transition between the [`State`]s.
57#[derive(Debug)]
58pub struct StateMachine {
59    /// The current state of the `RoomListService`.
60    state: SharedObservable<State>,
61
62    /// Last time the state has been updated.
63    ///
64    /// When the state has not been updated since a long time, we want to enter
65    /// the [`State::Recovering`] state. Why do we need to do that? Because in
66    /// some cases, the user might have received many updates between two
67    /// distant syncs. If the sliding sync list range was too large, like
68    /// 0..=499, the next sync is likely to be heavy and potentially slow.
69    /// In this case, it's preferable to jump back onto `Recovering`, which will
70    /// reset the range, so that the next sync will be fast for the client.
71    ///
72    /// To be used in coordination with `Self::state_lifespan`.
73    ///
74    /// This mutex is only taken for short periods of time, so it's sync.
75    last_state_update_time: Mutex<Instant>,
76
77    /// The maximum time before considering the state as “too old”.
78    ///
79    /// To be used in coordination with `Self::last_state_update_time`.
80    state_lifespan: Duration,
81}
82
83impl StateMachine {
84    pub(super) fn new() -> Self {
85        StateMachine {
86            state: SharedObservable::new(State::Init),
87            last_state_update_time: Mutex::new(Instant::now()),
88            state_lifespan: DEFAULT_STATE_LIFESPAN,
89        }
90    }
91
92    /// Get the current state.
93    pub(super) fn get(&self) -> State {
94        self.state.get()
95    }
96
97    /// Clone the inner [`Self::state`].
98    pub(super) fn cloned_state(&self) -> SharedObservable<State> {
99        self.state.clone()
100    }
101
102    /// Set the new state.
103    ///
104    /// Setting a new state will update `Self::last_state_update`.
105    pub(super) fn set(&self, state: State) {
106        let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
107        *last_state_update_time = Instant::now();
108
109        self.state.set(state);
110    }
111
112    /// Subscribe to state updates.
113    pub fn subscribe(&self) -> Subscriber<State> {
114        self.state.subscribe()
115    }
116
117    /// Transition to the next state, and execute the necessary transition on
118    /// the sliding sync list.
119    pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
120        use State::*;
121
122        let next_state = match self.get() {
123            Init => SettingUp,
124
125            SettingUp | Recovering => {
126                set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
127                Running
128            }
129
130            Running => {
131                // We haven't changed the state for a while, we go back to `Recovering` to avoid
132                // requesting potentially large data. See `Self::last_state_update` to learn
133                // the details.
134                if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
135                    set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
136
137                    Recovering
138                } else {
139                    Running
140                }
141            }
142
143            Error { from: previous_state } | Terminated { from: previous_state } => {
144                match previous_state.as_ref() {
145                    // Unreachable state.
146                    Error { .. } | Terminated { .. } => {
147                        unreachable!(
148                            "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
149                        );
150                    }
151
152                    // If the previous state was `Running`, we enter the `Recovering` state.
153                    Running => {
154                        set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
155                        Recovering
156                    }
157
158                    // Jump back to the previous state that led to this termination.
159                    state => state.to_owned(),
160                }
161            }
162        };
163
164        Ok(next_state)
165    }
166}
167
168async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
169    sliding_sync
170        .on_list(ALL_ROOMS_LIST_NAME, |list| {
171            list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
172
173            ready(())
174        })
175        .await
176        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
177}
178
179async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
180    sliding_sync
181        .on_list(ALL_ROOMS_LIST_NAME, |list| {
182            list.set_sync_mode(
183                SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
184            );
185
186            ready(())
187        })
188        .await
189        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
190}
191
192/// Default `batch_size` for the selective sync-mode of the
193/// `ALL_ROOMS_LIST_NAME` list.
194pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
195
196/// Default `batch_size` for the growing sync-mode of the `ALL_ROOMS_LIST_NAME`
197/// list.
198pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
199
200#[cfg(test)]
201mod tests {
202    use matrix_sdk::test_utils::client::MockClientBuilder;
203    use matrix_sdk_test::async_test;
204    use tokio::time::sleep;
205
206    use super::*;
207    use crate::RoomListService;
208
209    async fn new_room_list() -> Result<RoomListService, Error> {
210        let client = MockClientBuilder::new(None).build().await;
211        RoomListService::new(client).await
212    }
213
214    #[async_test]
215    async fn test_states() -> Result<(), Error> {
216        let room_list = new_room_list().await?;
217        let sliding_sync = room_list.sliding_sync();
218
219        let state_machine = StateMachine::new();
220
221        // Hypothetical error.
222        {
223            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
224
225            // Back to the previous state.
226            state_machine.set(state_machine.next(sliding_sync).await?);
227            assert_eq!(state_machine.get(), State::Init);
228        }
229
230        // Hypothetical termination.
231        {
232            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
233
234            // Back to the previous state.
235            state_machine.set(state_machine.next(sliding_sync).await?);
236            assert_eq!(state_machine.get(), State::Init);
237        }
238
239        // Next state.
240        state_machine.set(state_machine.next(sliding_sync).await?);
241        assert_eq!(state_machine.get(), State::SettingUp);
242
243        // Hypothetical error.
244        {
245            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
246
247            // Back to the previous state.
248            state_machine.set(state_machine.next(sliding_sync).await?);
249            assert_eq!(state_machine.get(), State::SettingUp);
250        }
251
252        // Hypothetical termination.
253        {
254            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
255
256            // Back to the previous state.
257            state_machine.set(state_machine.next(sliding_sync).await?);
258            assert_eq!(state_machine.get(), State::SettingUp);
259        }
260
261        // Next state.
262        state_machine.set(state_machine.next(sliding_sync).await?);
263        assert_eq!(state_machine.get(), State::Running);
264
265        // Hypothetical error.
266        {
267            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
268
269            // Jump to the **recovering** state!
270            state_machine.set(state_machine.next(sliding_sync).await?);
271            assert_eq!(state_machine.get(), State::Recovering);
272
273            // Now, back to the previous state.
274            state_machine.set(state_machine.next(sliding_sync).await?);
275            assert_eq!(state_machine.get(), State::Running);
276        }
277
278        // Hypothetical termination.
279        {
280            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
281
282            // Jump to the **recovering** state!
283            state_machine.set(state_machine.next(sliding_sync).await?);
284            assert_eq!(state_machine.get(), State::Recovering);
285
286            // Now, back to the previous state.
287            state_machine.set(state_machine.next(sliding_sync).await?);
288            assert_eq!(state_machine.get(), State::Running);
289        }
290
291        // Hypothetical error when recovering.
292        {
293            state_machine.set(State::Error { from: Box::new(State::Recovering) });
294
295            // Back to the previous state.
296            state_machine.set(state_machine.next(sliding_sync).await?);
297            assert_eq!(state_machine.get(), State::Recovering);
298        }
299
300        // Hypothetical termination when recovering.
301        {
302            state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
303
304            // Back to the previous state.
305            state_machine.set(state_machine.next(sliding_sync).await?);
306            assert_eq!(state_machine.get(), State::Recovering);
307        }
308
309        Ok(())
310    }
311
312    #[async_test]
313    async fn test_recover_state_after_delay() -> Result<(), Error> {
314        let room_list = new_room_list().await?;
315        let sliding_sync = room_list.sliding_sync();
316
317        let mut state_machine = StateMachine::new();
318        state_machine.state_lifespan = Duration::from_millis(50);
319
320        {
321            state_machine.set(state_machine.next(sliding_sync).await?);
322            assert_eq!(state_machine.get(), State::SettingUp);
323
324            state_machine.set(state_machine.next(sliding_sync).await?);
325            assert_eq!(state_machine.get(), State::Running);
326
327            state_machine.set(state_machine.next(sliding_sync).await?);
328            assert_eq!(state_machine.get(), State::Running);
329
330            state_machine.set(state_machine.next(sliding_sync).await?);
331            assert_eq!(state_machine.get(), State::Running);
332        }
333
334        // Time passes.
335        sleep(Duration::from_millis(100)).await;
336
337        {
338            // Time has elapsed, time to recover.
339            state_machine.set(state_machine.next(sliding_sync).await?);
340            assert_eq!(state_machine.get(), State::Recovering);
341
342            state_machine.set(state_machine.next(sliding_sync).await?);
343            assert_eq!(state_machine.get(), State::Running);
344
345            state_machine.set(state_machine.next(sliding_sync).await?);
346            assert_eq!(state_machine.get(), State::Running);
347
348            state_machine.set(state_machine.next(sliding_sync).await?);
349            assert_eq!(state_machine.get(), State::Running);
350        }
351
352        // Time passes, again. Just to test everything is going well.
353        sleep(Duration::from_millis(100)).await;
354
355        {
356            // Time has elapsed, time to recover.
357            state_machine.set(state_machine.next(sliding_sync).await?);
358            assert_eq!(state_machine.get(), State::Recovering);
359
360            state_machine.set(state_machine.next(sliding_sync).await?);
361            assert_eq!(state_machine.get(), State::Running);
362
363            state_machine.set(state_machine.next(sliding_sync).await?);
364            assert_eq!(state_machine.get(), State::Running);
365
366            state_machine.set(state_machine.next(sliding_sync).await?);
367            assert_eq!(state_machine.get(), State::Running);
368        }
369
370        Ok(())
371    }
372
373    #[async_test]
374    async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
375    {
376        let room_list = new_room_list().await?;
377        let sliding_sync = room_list.sliding_sync();
378
379        // List is present, in Selective mode.
380        assert_eq!(
381            sliding_sync
382                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
383                    list.sync_mode(),
384                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
385                )))
386                .await,
387            Some(true)
388        );
389
390        // Run the action!
391        set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
392
393        // List is still present, in Growing mode.
394        assert_eq!(
395            sliding_sync
396                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
397                    list.sync_mode(),
398                    SlidingSyncMode::Growing {
399                        batch_size, ..
400                    } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
401                )))
402                .await,
403            Some(true)
404        );
405
406        // Run the other action!
407        set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
408
409        // List is still present, in Selective mode.
410        assert_eq!(
411            sliding_sync
412                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
413                    list.sync_mode(),
414                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
415                )))
416                .await,
417            Some(true)
418        );
419
420        Ok(())
421    }
422}