matrix_sdk_ui/room_list_service/
state.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! States and actions for the `RoomList` state machine.
16
17use std::{future::ready, sync::Mutex};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
21use ruma::time::{Duration, Instant};
22
23use super::Error;
24
25pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
26
27/// The state of the [`super::RoomList`].
28#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30    /// That's the first initial state.
31    Init,
32
33    /// At this state, the first rooms have been synced.
34    SettingUp,
35
36    /// At this state, the system is recovering from `Error` or `Terminated`.
37    /// It's similar to `SettingUp` but some lists may already exist, actions
38    /// are then slightly different.
39    Recovering,
40
41    /// At this state, all rooms are syncing.
42    Running,
43
44    /// At this state, the sync has been stopped because an error happened.
45    Error { from: Box<State> },
46
47    /// At this state, the sync has been stopped because it was requested.
48    Terminated { from: Box<State> },
49}
50
51/// Default value for `StateMachine::state_lifespan`.
52const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
53
54/// The state machine used to transition between the [`State`]s.
55#[derive(Debug)]
56pub struct StateMachine {
57    /// The current state of the `RoomListService`.
58    state: SharedObservable<State>,
59
60    /// Last time the state has been updated.
61    ///
62    /// When the state has not been updated since a long time, we want to enter
63    /// the [`State::Recovering`] state. Why do we need to do that? Because in
64    /// some cases, the user might have received many updates between two
65    /// distant syncs. If the sliding sync list range was too large, like
66    /// 0..=499, the next sync is likely to be heavy and potentially slow.
67    /// In this case, it's preferable to jump back onto `Recovering`, which will
68    /// reset the range, so that the next sync will be fast for the client.
69    ///
70    /// To be used in coordination with `Self::state_lifespan`.
71    ///
72    /// This mutex is only taken for short periods of time, so it's sync.
73    last_state_update_time: Mutex<Instant>,
74
75    /// The maximum time before considering the state as “too old”.
76    ///
77    /// To be used in coordination with `Self::last_state_update_time`.
78    state_lifespan: Duration,
79}
80
81impl StateMachine {
82    pub(super) fn new() -> Self {
83        StateMachine {
84            state: SharedObservable::new(State::Init),
85            last_state_update_time: Mutex::new(Instant::now()),
86            state_lifespan: DEFAULT_STATE_LIFESPAN,
87        }
88    }
89
90    /// Get the current state.
91    pub(super) fn get(&self) -> State {
92        self.state.get()
93    }
94
95    /// Set the new state.
96    ///
97    /// Setting a new state will update `Self::last_state_update`.
98    pub(super) fn set(&self, state: State) {
99        let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
100        *last_state_update_time = Instant::now();
101
102        self.state.set(state);
103    }
104
105    /// Subscribe to state updates.
106    pub fn subscribe(&self) -> Subscriber<State> {
107        self.state.subscribe()
108    }
109
110    /// Transition to the next state, and execute the associated transition's
111    /// [`Actions`].
112    pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
113        use State::*;
114
115        let next_state = match self.get() {
116            Init => SettingUp,
117
118            SettingUp | Recovering => {
119                set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
120                Running
121            }
122
123            Running => {
124                // We haven't changed the state for a while, we go back to `Recovering` to avoid
125                // requesting potentially large data. See `Self::last_state_update` to learn
126                // the details.
127                if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
128                    set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
129
130                    Recovering
131                } else {
132                    Running
133                }
134            }
135
136            Error { from: previous_state } | Terminated { from: previous_state } => {
137                match previous_state.as_ref() {
138                    // Unreachable state.
139                    Error { .. } | Terminated { .. } => {
140                        unreachable!(
141                            "It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
142                        );
143                    }
144
145                    // If the previous state was `Running`, we enter the `Recovering` state.
146                    Running => {
147                        set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
148                        Recovering
149                    }
150
151                    // Jump back to the previous state that led to this termination.
152                    state => state.to_owned(),
153                }
154            }
155        };
156
157        Ok(next_state)
158    }
159}
160
161async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
162    sliding_sync
163        .on_list(ALL_ROOMS_LIST_NAME, |list| {
164            list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
165
166            ready(())
167        })
168        .await
169        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
170}
171
172async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
173    sliding_sync
174        .on_list(ALL_ROOMS_LIST_NAME, |list| {
175            list.set_sync_mode(
176                SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
177            );
178
179            ready(())
180        })
181        .await
182        .ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
183}
184
185/// Default `batch_size` for the selective sync-mode of the
186/// `ALL_ROOMS_LIST_NAME` list.
187pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
188
189/// Default `batch_size` for the growing sync-mode of the `ALL_ROOMS_LIST_NAME`
190/// list.
191pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
192
193#[cfg(test)]
194mod tests {
195    use matrix_sdk_test::async_test;
196    use tokio::time::sleep;
197
198    use super::{super::tests::new_room_list, *};
199
200    #[async_test]
201    async fn test_states() -> Result<(), Error> {
202        let room_list = new_room_list().await?;
203        let sliding_sync = room_list.sliding_sync();
204
205        let state_machine = StateMachine::new();
206
207        // Hypothetical error.
208        {
209            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
210
211            // Back to the previous state.
212            state_machine.set(state_machine.next(sliding_sync).await?);
213            assert_eq!(state_machine.get(), State::Init);
214        }
215
216        // Hypothetical termination.
217        {
218            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
219
220            // Back to the previous state.
221            state_machine.set(state_machine.next(sliding_sync).await?);
222            assert_eq!(state_machine.get(), State::Init);
223        }
224
225        // Next state.
226        state_machine.set(state_machine.next(sliding_sync).await?);
227        assert_eq!(state_machine.get(), State::SettingUp);
228
229        // Hypothetical error.
230        {
231            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
232
233            // Back to the previous state.
234            state_machine.set(state_machine.next(sliding_sync).await?);
235            assert_eq!(state_machine.get(), State::SettingUp);
236        }
237
238        // Hypothetical termination.
239        {
240            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
241
242            // Back to the previous state.
243            state_machine.set(state_machine.next(sliding_sync).await?);
244            assert_eq!(state_machine.get(), State::SettingUp);
245        }
246
247        // Next state.
248        state_machine.set(state_machine.next(sliding_sync).await?);
249        assert_eq!(state_machine.get(), State::Running);
250
251        // Hypothetical error.
252        {
253            state_machine.set(State::Error { from: Box::new(state_machine.get()) });
254
255            // Jump to the **recovering** state!
256            state_machine.set(state_machine.next(sliding_sync).await?);
257            assert_eq!(state_machine.get(), State::Recovering);
258
259            // Now, back to the previous state.
260            state_machine.set(state_machine.next(sliding_sync).await?);
261            assert_eq!(state_machine.get(), State::Running);
262        }
263
264        // Hypothetical termination.
265        {
266            state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
267
268            // Jump to the **recovering** state!
269            state_machine.set(state_machine.next(sliding_sync).await?);
270            assert_eq!(state_machine.get(), State::Recovering);
271
272            // Now, back to the previous state.
273            state_machine.set(state_machine.next(sliding_sync).await?);
274            assert_eq!(state_machine.get(), State::Running);
275        }
276
277        // Hypothetical error when recovering.
278        {
279            state_machine.set(State::Error { from: Box::new(State::Recovering) });
280
281            // Back to the previous state.
282            state_machine.set(state_machine.next(sliding_sync).await?);
283            assert_eq!(state_machine.get(), State::Recovering);
284        }
285
286        // Hypothetical termination when recovering.
287        {
288            state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
289
290            // Back to the previous state.
291            state_machine.set(state_machine.next(sliding_sync).await?);
292            assert_eq!(state_machine.get(), State::Recovering);
293        }
294
295        Ok(())
296    }
297
298    #[async_test]
299    async fn test_recover_state_after_delay() -> Result<(), Error> {
300        let room_list = new_room_list().await?;
301        let sliding_sync = room_list.sliding_sync();
302
303        let mut state_machine = StateMachine::new();
304        state_machine.state_lifespan = Duration::from_millis(50);
305
306        {
307            state_machine.set(state_machine.next(sliding_sync).await?);
308            assert_eq!(state_machine.get(), State::SettingUp);
309
310            state_machine.set(state_machine.next(sliding_sync).await?);
311            assert_eq!(state_machine.get(), State::Running);
312
313            state_machine.set(state_machine.next(sliding_sync).await?);
314            assert_eq!(state_machine.get(), State::Running);
315
316            state_machine.set(state_machine.next(sliding_sync).await?);
317            assert_eq!(state_machine.get(), State::Running);
318        }
319
320        // Time passes.
321        sleep(Duration::from_millis(100)).await;
322
323        {
324            // Time has elapsed, time to recover.
325            state_machine.set(state_machine.next(sliding_sync).await?);
326            assert_eq!(state_machine.get(), State::Recovering);
327
328            state_machine.set(state_machine.next(sliding_sync).await?);
329            assert_eq!(state_machine.get(), State::Running);
330
331            state_machine.set(state_machine.next(sliding_sync).await?);
332            assert_eq!(state_machine.get(), State::Running);
333
334            state_machine.set(state_machine.next(sliding_sync).await?);
335            assert_eq!(state_machine.get(), State::Running);
336        }
337
338        // Time passes, again. Just to test everything is going well.
339        sleep(Duration::from_millis(100)).await;
340
341        {
342            // Time has elapsed, time to recover.
343            state_machine.set(state_machine.next(sliding_sync).await?);
344            assert_eq!(state_machine.get(), State::Recovering);
345
346            state_machine.set(state_machine.next(sliding_sync).await?);
347            assert_eq!(state_machine.get(), State::Running);
348
349            state_machine.set(state_machine.next(sliding_sync).await?);
350            assert_eq!(state_machine.get(), State::Running);
351
352            state_machine.set(state_machine.next(sliding_sync).await?);
353            assert_eq!(state_machine.get(), State::Running);
354        }
355
356        Ok(())
357    }
358
359    #[async_test]
360    async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
361    {
362        let room_list = new_room_list().await?;
363        let sliding_sync = room_list.sliding_sync();
364
365        // List is present, in Selective mode.
366        assert_eq!(
367            sliding_sync
368                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
369                    list.sync_mode(),
370                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
371                )))
372                .await,
373            Some(true)
374        );
375
376        // Run the action!
377        set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
378
379        // List is still present, in Growing mode.
380        assert_eq!(
381            sliding_sync
382                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
383                    list.sync_mode(),
384                    SlidingSyncMode::Growing {
385                        batch_size, ..
386                    } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
387                )))
388                .await,
389            Some(true)
390        );
391
392        // Run the other action!
393        set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
394
395        // List is still present, in Selective mode.
396        assert_eq!(
397            sliding_sync
398                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
399                    list.sync_mode(),
400                    SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
401                )))
402                .await,
403            Some(true)
404        );
405
406        Ok(())
407    }
408}