1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
1415//! States and actions for the `RoomList` state machine.
1617use std::{future::ready, sync::Mutex};
1819use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk::{sliding_sync::Range, SlidingSync, SlidingSyncMode};
21use ruma::time::{Duration, Instant};
2223use super::Error;
2425pub const ALL_ROOMS_LIST_NAME: &str = "all_rooms";
2627/// The state of the [`super::RoomList`].
28#[derive(Clone, Debug, PartialEq)]
29pub enum State {
30/// That's the first initial state.
31Init,
3233/// At this state, the first rooms have been synced.
34SettingUp,
3536/// At this state, the system is recovering from `Error` or `Terminated`.
37 /// It's similar to `SettingUp` but some lists may already exist, actions
38 /// are then slightly different.
39Recovering,
4041/// At this state, all rooms are syncing.
42Running,
4344/// At this state, the sync has been stopped because an error happened.
45Error { from: Box<State> },
4647/// At this state, the sync has been stopped because it was requested.
48Terminated { from: Box<State> },
49}
5051/// Default value for `StateMachine::state_lifespan`.
52const DEFAULT_STATE_LIFESPAN: Duration = Duration::from_secs(1800);
5354/// The state machine used to transition between the [`State`]s.
55#[derive(Debug)]
56pub struct StateMachine {
57/// The current state of the `RoomListService`.
58state: SharedObservable<State>,
5960/// Last time the state has been updated.
61 ///
62 /// When the state has not been updated since a long time, we want to enter
63 /// the [`State::Recovering`] state. Why do we need to do that? Because in
64 /// some cases, the user might have received many updates between two
65 /// distant syncs. If the sliding sync list range was too large, like
66 /// 0..=499, the next sync is likely to be heavy and potentially slow.
67 /// In this case, it's preferable to jump back onto `Recovering`, which will
68 /// reset the range, so that the next sync will be fast for the client.
69 ///
70 /// To be used in coordination with `Self::state_lifespan`.
71 ///
72 /// This mutex is only taken for short periods of time, so it's sync.
73last_state_update_time: Mutex<Instant>,
7475/// The maximum time before considering the state as “too old”.
76 ///
77 /// To be used in coordination with `Self::last_state_update_time`.
78state_lifespan: Duration,
79}
8081impl StateMachine {
82pub(super) fn new() -> Self {
83 StateMachine {
84 state: SharedObservable::new(State::Init),
85 last_state_update_time: Mutex::new(Instant::now()),
86 state_lifespan: DEFAULT_STATE_LIFESPAN,
87 }
88 }
8990/// Get the current state.
91pub(super) fn get(&self) -> State {
92self.state.get()
93 }
9495/// Set the new state.
96 ///
97 /// Setting a new state will update `Self::last_state_update`.
98pub(super) fn set(&self, state: State) {
99let mut last_state_update_time = self.last_state_update_time.lock().unwrap();
100*last_state_update_time = Instant::now();
101102self.state.set(state);
103 }
104105/// Subscribe to state updates.
106pub fn subscribe(&self) -> Subscriber<State> {
107self.state.subscribe()
108 }
109110/// Transition to the next state, and execute the associated transition's
111 /// [`Actions`].
112pub(super) async fn next(&self, sliding_sync: &SlidingSync) -> Result<State, Error> {
113use State::*;
114115let next_state = match self.get() {
116 Init => SettingUp,
117118 SettingUp | Recovering => {
119 set_all_rooms_to_growing_sync_mode(sliding_sync).await?;
120 Running
121 }
122123 Running => {
124// We haven't changed the state for a while, we go back to `Recovering` to avoid
125 // requesting potentially large data. See `Self::last_state_update` to learn
126 // the details.
127if self.last_state_update_time.lock().unwrap().elapsed() > self.state_lifespan {
128 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
129130 Recovering
131 } else {
132 Running
133 }
134 }
135136 Error { from: previous_state } | Terminated { from: previous_state } => {
137match previous_state.as_ref() {
138// Unreachable state.
139Error { .. } | Terminated { .. } => {
140unreachable!(
141"It's impossible to reach `Error` or `Terminated` from `Error` or `Terminated`"
142);
143 }
144145// If the previous state was `Running`, we enter the `Recovering` state.
146Running => {
147 set_all_rooms_to_selective_sync_mode(sliding_sync).await?;
148 Recovering
149 }
150151// Jump back to the previous state that led to this termination.
152state => state.to_owned(),
153 }
154 }
155 };
156157Ok(next_state)
158 }
159}
160161async fn set_all_rooms_to_growing_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
162 sliding_sync
163 .on_list(ALL_ROOMS_LIST_NAME, |list| {
164 list.set_sync_mode(SlidingSyncMode::new_growing(ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE));
165166 ready(())
167 })
168 .await
169.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
170}
171172async fn set_all_rooms_to_selective_sync_mode(sliding_sync: &SlidingSync) -> Result<(), Error> {
173 sliding_sync
174 .on_list(ALL_ROOMS_LIST_NAME, |list| {
175 list.set_sync_mode(
176 SlidingSyncMode::new_selective().add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
177 );
178179 ready(())
180 })
181 .await
182.ok_or_else(|| Error::UnknownList(ALL_ROOMS_LIST_NAME.to_owned()))
183}
184185/// Default `batch_size` for the selective sync-mode of the
186/// `ALL_ROOMS_LIST_NAME` list.
187pub const ALL_ROOMS_DEFAULT_SELECTIVE_RANGE: Range = 0..=19;
188189/// Default `batch_size` for the growing sync-mode of the `ALL_ROOMS_LIST_NAME`
190/// list.
191pub const ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE: u32 = 100;
192193#[cfg(test)]
194mod tests {
195use matrix_sdk_test::async_test;
196use tokio::time::sleep;
197198use super::{super::tests::new_room_list, *};
199200#[async_test]
201async fn test_states() -> Result<(), Error> {
202let room_list = new_room_list().await?;
203let sliding_sync = room_list.sliding_sync();
204205let state_machine = StateMachine::new();
206207// Hypothetical error.
208{
209 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
210211// Back to the previous state.
212state_machine.set(state_machine.next(sliding_sync).await?);
213assert_eq!(state_machine.get(), State::Init);
214 }
215216// Hypothetical termination.
217{
218 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
219220// Back to the previous state.
221state_machine.set(state_machine.next(sliding_sync).await?);
222assert_eq!(state_machine.get(), State::Init);
223 }
224225// Next state.
226state_machine.set(state_machine.next(sliding_sync).await?);
227assert_eq!(state_machine.get(), State::SettingUp);
228229// Hypothetical error.
230{
231 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
232233// Back to the previous state.
234state_machine.set(state_machine.next(sliding_sync).await?);
235assert_eq!(state_machine.get(), State::SettingUp);
236 }
237238// Hypothetical termination.
239{
240 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
241242// Back to the previous state.
243state_machine.set(state_machine.next(sliding_sync).await?);
244assert_eq!(state_machine.get(), State::SettingUp);
245 }
246247// Next state.
248state_machine.set(state_machine.next(sliding_sync).await?);
249assert_eq!(state_machine.get(), State::Running);
250251// Hypothetical error.
252{
253 state_machine.set(State::Error { from: Box::new(state_machine.get()) });
254255// Jump to the **recovering** state!
256state_machine.set(state_machine.next(sliding_sync).await?);
257assert_eq!(state_machine.get(), State::Recovering);
258259// Now, back to the previous state.
260state_machine.set(state_machine.next(sliding_sync).await?);
261assert_eq!(state_machine.get(), State::Running);
262 }
263264// Hypothetical termination.
265{
266 state_machine.set(State::Terminated { from: Box::new(state_machine.get()) });
267268// Jump to the **recovering** state!
269state_machine.set(state_machine.next(sliding_sync).await?);
270assert_eq!(state_machine.get(), State::Recovering);
271272// Now, back to the previous state.
273state_machine.set(state_machine.next(sliding_sync).await?);
274assert_eq!(state_machine.get(), State::Running);
275 }
276277// Hypothetical error when recovering.
278{
279 state_machine.set(State::Error { from: Box::new(State::Recovering) });
280281// Back to the previous state.
282state_machine.set(state_machine.next(sliding_sync).await?);
283assert_eq!(state_machine.get(), State::Recovering);
284 }
285286// Hypothetical termination when recovering.
287{
288 state_machine.set(State::Terminated { from: Box::new(State::Recovering) });
289290// Back to the previous state.
291state_machine.set(state_machine.next(sliding_sync).await?);
292assert_eq!(state_machine.get(), State::Recovering);
293 }
294295Ok(())
296 }
297298#[async_test]
299async fn test_recover_state_after_delay() -> Result<(), Error> {
300let room_list = new_room_list().await?;
301let sliding_sync = room_list.sliding_sync();
302303let mut state_machine = StateMachine::new();
304 state_machine.state_lifespan = Duration::from_millis(50);
305306 {
307 state_machine.set(state_machine.next(sliding_sync).await?);
308assert_eq!(state_machine.get(), State::SettingUp);
309310 state_machine.set(state_machine.next(sliding_sync).await?);
311assert_eq!(state_machine.get(), State::Running);
312313 state_machine.set(state_machine.next(sliding_sync).await?);
314assert_eq!(state_machine.get(), State::Running);
315316 state_machine.set(state_machine.next(sliding_sync).await?);
317assert_eq!(state_machine.get(), State::Running);
318 }
319320// Time passes.
321sleep(Duration::from_millis(100)).await;
322323 {
324// Time has elapsed, time to recover.
325state_machine.set(state_machine.next(sliding_sync).await?);
326assert_eq!(state_machine.get(), State::Recovering);
327328 state_machine.set(state_machine.next(sliding_sync).await?);
329assert_eq!(state_machine.get(), State::Running);
330331 state_machine.set(state_machine.next(sliding_sync).await?);
332assert_eq!(state_machine.get(), State::Running);
333334 state_machine.set(state_machine.next(sliding_sync).await?);
335assert_eq!(state_machine.get(), State::Running);
336 }
337338// Time passes, again. Just to test everything is going well.
339sleep(Duration::from_millis(100)).await;
340341 {
342// Time has elapsed, time to recover.
343state_machine.set(state_machine.next(sliding_sync).await?);
344assert_eq!(state_machine.get(), State::Recovering);
345346 state_machine.set(state_machine.next(sliding_sync).await?);
347assert_eq!(state_machine.get(), State::Running);
348349 state_machine.set(state_machine.next(sliding_sync).await?);
350assert_eq!(state_machine.get(), State::Running);
351352 state_machine.set(state_machine.next(sliding_sync).await?);
353assert_eq!(state_machine.get(), State::Running);
354 }
355356Ok(())
357 }
358359#[async_test]
360async fn test_action_set_all_rooms_list_to_growing_and_selective_sync_mode() -> Result<(), Error>
361 {
362let room_list = new_room_list().await?;
363let sliding_sync = room_list.sliding_sync();
364365// List is present, in Selective mode.
366assert_eq!(
367 sliding_sync
368 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
369 list.sync_mode(),
370 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
371 )))
372 .await,
373Some(true)
374 );
375376// Run the action!
377set_all_rooms_to_growing_sync_mode(sliding_sync).await.unwrap();
378379// List is still present, in Growing mode.
380assert_eq!(
381 sliding_sync
382 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
383 list.sync_mode(),
384 SlidingSyncMode::Growing {
385 batch_size, ..
386 } if batch_size == ALL_ROOMS_DEFAULT_GROWING_BATCH_SIZE
387 )))
388 .await,
389Some(true)
390 );
391392// Run the other action!
393set_all_rooms_to_selective_sync_mode(sliding_sync).await.unwrap();
394395// List is still present, in Selective mode.
396assert_eq!(
397 sliding_sync
398 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
399 list.sync_mode(),
400 SlidingSyncMode::Selective { ranges } if ranges == vec![ALL_ROOMS_DEFAULT_SELECTIVE_RANGE]
401 )))
402 .await,
403Some(true)
404 );
405406Ok(())
407 }
408}