matrix_sdk_ui/room_list_service/
state.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! States and actions for the `RoomList` state machine.
16
17use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{SlidingSync, SlidingSyncMode, sliding_sync::Range};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27/// The state of the [`super::RoomList`].
28#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30    /// That's the first initial state.
31    Init,
32
33    /// At this state, the first rooms have been synced.
34    SettingUp,
35
36    /// At this state, the system is recovering from `Error` or `Terminated`, or
37    /// the time between the last sync was too long (see
38    /// `StateMachine::state_lifespan` to learn more). It's similar to
39    /// `SettingUp` but some lists may already exist, actions
40    /// are then slightly different.
41    Recovering,
42
43    /// At this state, all rooms are syncing.
44    Running,
45
46    /// At this state, the sync has been stopped because an error happened.
47    Error { from: Box<State> },
48
49    /// At this state, the sync has been stopped because it was requested.
50    Terminated { from: Box<State> },
51}
52
53/// Default value for `StateMachine::state_lifespan`.
54const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
55
56/// The state machine used to transition between the [`State`]s.
57#[derive(Debug)]
58pub struct StateMachine {
59    /// The current state of the `RoomListService`.
60    state: SharedObservable<State>,
61
62    /// Last time the state has been updated.
63    ///
64    /// When the state has not been updated since a long time, we want to enter
65    /// the [`State::Recovering`] state. Why do we need to do that? Because in
66    /// some cases, the user might have received many updates between two
67    /// distant syncs. If the sliding sync list range was too large, like
68    /// 0..=499, the next sync is likely to be heavy and potentially slow.
69    /// In this case, it's preferable to jump back onto `Recovering`, which will
70    /// reset the range, so that the next sync will be fast for the client.
71    ///
72    /// To be used in coordination with `Self::state_lifespan`.
73    ///
74    /// This mutex is only taken for short periods of time, so it's sync.
75    last_state_update_time: Mutex<Instant>,
76
77    /// The maximum time before considering the state as “too old”.
78    ///
79    /// To be used in coordination with `Self::last_state_update_time`.
80    state_lifespan: Duration,
81}
82
83impl StateMachine {
84    pub(super) fn new() -> Self {
85        StateMachine {
86            state: SharedObservable::new(State::Init),
87            last_state_update_time: Mutex::new(Instant::now()),
88            state_lifespan: DEFAULT_STATE_LIFESPAN,
89        }
90    }
91
92    /// Get the current state.
93    pub(super) fn get(&self) -> State {
94        self.state.get()
95    }
96
97    /// Clone the inner [`Self::state`].
98    pub(super) fn cloned_state(&self) -> SharedObservable<State> {
99        self.state.clone()
100    }
101
102    /// Set the new state.
103    ///
104    /// Setting a new state will update `Self::last_state_update`.
105    pub(super) fn set(&self, state: State) {
106        let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
107        *last_state_update_time = Instant::now();
108
109        self.state.set(state);
110    }
111
112    /// Subscribe to state updates.
113    pub fn subscribe(&self) -> Subscriber<State> {
114        self.state.subscribe()
115    }
116
117    /// Transition to the next state, and execute the necessary transition on
118    /// the sliding sync list.
119    pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
120        use State::*;
121
122        let next_state = match self.get() {
123            Init => SettingUp,
124
125            SettingUp | Recovering => {
126                set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
127                Running
128            }
129
130            Running => {
131                // We haven't changed the state for a while, we go back to `Recovering` to avoid
132                // requesting potentially large data. See `Self::last_state_update` to learn
133                // the details.
134                if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
135                    set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
136
137                    Recovering
138                } else {
139                    Running
140                }
141            }
142
143            Error { from: previous_state } | Terminated { from: previous_state } => {
144                match previous_state.as_ref() {
145                    // Unreachable state.
146                    Error { .. } | Terminated { .. } => {
147                        unreachable!(
148                            "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
149                        );
150                    }
151
152                    // If the previous state was `Running`, we enter the `Recovering` state.
153                    Running => {
154                        set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
155                        Recovering
156                    }
157
158                    // Jump back to the previous state that led to this termination.
159                    state => state.to_owned(),
160                }
161            }
162        };
163
164        Ok(next_state)
165    }
166}
167
168async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
169    sliding_sync
170        .on_list(ALL_ROOMS_LIST_NAME, |list| {
171            list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
172
173            ready(())
174        })
175        .await
176        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
177}
178
179async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
180    sliding_sync
181        .on_list(ALL_ROOMS_LIST_NAME, |list| {
182            list.set_sync_mode(
183                SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
184            );
185
186            ready(())
187        })
188        .await
189        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
190}
191
192/// Default `batch_size` for the selective sync-mode of the
193/// `ALL_ROOMS_LIST_NAME` list.
194pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
195
196/// Default `batch_size` for the growing sync-mode of the `ALL_ROOMS_LIST_NAME`
197/// list.
198pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
199
200#[cfg(test)]
201mod tests {
202    use matrix_sdk_test::async_test;
203    use tokio::time::sleep;
204
205    use super::{super::tests::new_room_list, *};
206
207    #[async_test]
208    async fn test_states() -> Result<(), Error> {
209        let room_list = new_room_list().await?;
210        let sliding_sync = room_list.sliding_sync();
211
212        let state_machine = StateMachine::new();
213
214        // Hypothetical error.
215        {
216            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
217
218            // Back to the previous state.
219            state_machine.set(state_machine.next(sliding_sync).await?);
220            assert_eq!(state_machine.get(), State::Init);
221        }
222
223        // Hypothetical termination.
224        {
225            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
226
227            // Back to the previous state.
228            state_machine.set(state_machine.next(sliding_sync).await?);
229            assert_eq!(state_machine.get(), State::Init);
230        }
231
232        // Next state.
233        state_machine.set(state_machine.next(sliding_sync).await?);
234        assert_eq!(state_machine.get(), State::SettingUp);
235
236        // Hypothetical error.
237        {
238            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
239
240            // Back to the previous state.
241            state_machine.set(state_machine.next(sliding_sync).await?);
242            assert_eq!(state_machine.get(), State::SettingUp);
243        }
244
245        // Hypothetical termination.
246        {
247            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
248
249            // Back to the previous state.
250            state_machine.set(state_machine.next(sliding_sync).await?);
251            assert_eq!(state_machine.get(), State::SettingUp);
252        }
253
254        // Next state.
255        state_machine.set(state_machine.next(sliding_sync).await?);
256        assert_eq!(state_machine.get(), State::Running);
257
258        // Hypothetical error.
259        {
260            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
261
262            // Jump to the **recovering** state!
263            state_machine.set(state_machine.next(sliding_sync).await?);
264            assert_eq!(state_machine.get(), State::Recovering);
265
266            // Now, back to the previous state.
267            state_machine.set(state_machine.next(sliding_sync).await?);
268            assert_eq!(state_machine.get(), State::Running);
269        }
270
271        // Hypothetical termination.
272        {
273            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
274
275            // Jump to the **recovering** state!
276            state_machine.set(state_machine.next(sliding_sync).await?);
277            assert_eq!(state_machine.get(), State::Recovering);
278
279            // Now, back to the previous state.
280            state_machine.set(state_machine.next(sliding_sync).await?);
281            assert_eq!(state_machine.get(), State::Running);
282        }
283
284        // Hypothetical error when recovering.
285        {
286            state_machine.set(State::Error { from: Box::new(State::Recovering) });
287
288            // Back to the previous state.
289            state_machine.set(state_machine.next(sliding_sync).await?);
290            assert_eq!(state_machine.get(), State::Recovering);
291        }
292
293        // Hypothetical termination when recovering.
294        {
295            state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
296
297            // Back to the previous state.
298            state_machine.set(state_machine.next(sliding_sync).await?);
299            assert_eq!(state_machine.get(), State::Recovering);
300        }
301
302        Ok(())
303    }
304
305    #[async_test]
306    async fn test_recover_state_after_delay() -> Result<(), Error> {
307        let room_list = new_room_list().await?;
308        let sliding_sync = room_list.sliding_sync();
309
310        let mut state_machine = StateMachine::new();
311        state_machine.state_lifespan = Duration::from_millis(50);
312
313        {
314            state_machine.set(state_machine.next(sliding_sync).await?);
315            assert_eq!(state_machine.get(), State::SettingUp);
316
317            state_machine.set(state_machine.next(sliding_sync).await?);
318            assert_eq!(state_machine.get(), State::Running);
319
320            state_machine.set(state_machine.next(sliding_sync).await?);
321            assert_eq!(state_machine.get(), State::Running);
322
323            state_machine.set(state_machine.next(sliding_sync).await?);
324            assert_eq!(state_machine.get(), State::Running);
325        }
326
327        // Time passes.
328        sleep(Duration::from_millis(100)).await;
329
330        {
331            // Time has elapsed, time to recover.
332            state_machine.set(state_machine.next(sliding_sync).await?);
333            assert_eq!(state_machine.get(), State::Recovering);
334
335            state_machine.set(state_machine.next(sliding_sync).await?);
336            assert_eq!(state_machine.get(), State::Running);
337
338            state_machine.set(state_machine.next(sliding_sync).await?);
339            assert_eq!(state_machine.get(), State::Running);
340
341            state_machine.set(state_machine.next(sliding_sync).await?);
342            assert_eq!(state_machine.get(), State::Running);
343        }
344
345        // Time passes, again. Just to test everything is going well.
346        sleep(Duration::from_millis(100)).await;
347
348        {
349            // Time has elapsed, time to recover.
350            state_machine.set(state_machine.next(sliding_sync).await?);
351            assert_eq!(state_machine.get(), State::Recovering);
352
353            state_machine.set(state_machine.next(sliding_sync).await?);
354            assert_eq!(state_machine.get(), State::Running);
355
356            state_machine.set(state_machine.next(sliding_sync).await?);
357            assert_eq!(state_machine.get(), State::Running);
358
359            state_machine.set(state_machine.next(sliding_sync).await?);
360            assert_eq!(state_machine.get(), State::Running);
361        }
362
363        Ok(())
364    }
365
366    #[async_test]
367    async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
368    {
369        let room_list = new_room_list().await?;
370        let sliding_sync = room_list.sliding_sync();
371
372        // List is present, in Selective mode.
373        assert_eq!(
374            sliding_sync
375                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
376                    list.sync_mode(),
377                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
378                )))
379                .await,
380            Some(true)
381        );
382
383        // Run the action!
384        set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
385
386        // List is still present, in Growing mode.
387        assert_eq!(
388            sliding_sync
389                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
390                    list.sync_mode(),
391                    SlidingSyncMode::Growing {
392                        batch_size, ..
393                    } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
394                )))
395                .await,
396            Some(true)
397        );
398
399        // Run the other action!
400        set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
401
402        // List is still present, in Selective mode.
403        assert_eq!(
404            sliding_sync
405                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
406                    list.sync_mode(),
407                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
408                )))
409                .await,
410            Some(true)
411        );
412
413        Ok(())
414    }
415}