Skip to main content

matrix_sdk_base/store/
memory_store.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    cmp::Reverse,
17    collections::{BTreeMap, BTreeSet, HashMap},
18    sync::RwLock,
19};
20
21use async_trait::async_trait;
22use growable_bloom_filter::GrowableBloom;
23use matrix_sdk_common::{ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, ttl::TtlValue};
24use ruma::{
25    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
26    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
27    api::client::discovery::get_capabilities::v3::Capabilities,
28    canonical_json::{RedactedBecause, redact},
29    events::{
30        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
31        AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
32        presence::PresenceEvent,
33        receipt::{Receipt, ReceiptThread, ReceiptType},
34        room::member::{MembershipState, StrippedRoomMemberEvent, SyncRoomMemberEvent},
35    },
36    serde::Raw,
37    time::Instant,
38};
39use tracing::{debug, instrument, warn};
40
41use super::{
42    DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo,
43    RoomLoadSettings, StateChanges, StateStore, StoreError, SupportedVersionsResponse,
44    WellKnownResponse,
45    send_queue::{ChildTransactionId, QueuedRequest, SentRequestKey},
46    traits::ComposerDraft,
47};
48use crate::{
49    MinimalRoomMemberEvent, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
50    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
51    store::{
52        QueueWedgeError, StoredThreadSubscription,
53        traits::{ThreadSubscriptionCatchupToken, compare_thread_subscription_bump_stamps},
54    },
55};
56
57#[derive(Debug, Default)]
58#[allow(clippy::type_complexity)]
59struct MemoryStoreInner {
60    recently_visited_rooms: HashMap<OwnedUserId, Vec<OwnedRoomId>>,
61    composer_drafts: HashMap<(OwnedRoomId, Option<OwnedEventId>), ComposerDraft>,
62    user_avatar_url: HashMap<OwnedUserId, OwnedMxcUri>,
63    sync_token: Option<String>,
64    supported_versions: Option<TtlValue<SupportedVersionsResponse>>,
65    well_known: Option<TtlValue<Option<WellKnownResponse>>>,
66    filters: HashMap<String, String>,
67    utd_hook_manager_data: Option<GrowableBloom>,
68    one_time_key_uploaded_error: bool,
69    account_data: HashMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
70    profiles: HashMap<OwnedRoomId, HashMap<OwnedUserId, MinimalRoomMemberEvent>>,
71    display_names: HashMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
72    members: HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
73    room_info: HashMap<OwnedRoomId, RoomInfo>,
74    room_state:
75        HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<AnySyncStateEvent>>>>,
76    room_account_data:
77        HashMap<OwnedRoomId, HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
78    stripped_room_state:
79        HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<AnyStrippedStateEvent>>>>,
80    stripped_members: HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
81    presence: HashMap<OwnedUserId, Raw<PresenceEvent>>,
82    room_user_receipts: HashMap<
83        OwnedRoomId,
84        HashMap<(String, Option<String>), HashMap<OwnedUserId, (OwnedEventId, Receipt)>>,
85    >,
86    room_event_receipts: HashMap<
87        OwnedRoomId,
88        HashMap<(String, Option<String>), HashMap<OwnedEventId, HashMap<OwnedUserId, Receipt>>>,
89    >,
90    custom: HashMap<Vec<u8>, Vec<u8>>,
91    send_queue_events: BTreeMap<OwnedRoomId, Vec<QueuedRequest>>,
92    dependent_send_queue_events: BTreeMap<OwnedRoomId, Vec<DependentQueuedRequest>>,
93    seen_knock_requests: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, OwnedUserId>>,
94    thread_subscriptions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, StoredThreadSubscription>>,
95    thread_subscriptions_catchup_tokens: Option<Vec<ThreadSubscriptionCatchupToken>>,
96    homeserver_capabilities: Option<TtlValue<Capabilities>>,
97}
98
99/// In-memory, non-persistent implementation of the `StateStore`.
100///
101/// Default if no other is configured at startup.
102#[derive(Debug, Default)]
103pub struct MemoryStore {
104    inner: RwLock<MemoryStoreInner>,
105}
106
107impl MemoryStore {
108    /// Create a new empty MemoryStore
109    pub fn new() -> Self {
110        Self::default()
111    }
112
113    fn get_user_room_receipt_event_impl(
114        &self,
115        room_id: &RoomId,
116        receipt_type: ReceiptType,
117        thread: ReceiptThread,
118        user_id: &UserId,
119    ) -> Option<(OwnedEventId, Receipt)> {
120        self.inner
121            .read()
122            .unwrap()
123            .room_user_receipts
124            .get(room_id)?
125            .get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned)))?
126            .get(user_id)
127            .cloned()
128    }
129
130    fn get_event_room_receipt_events_impl(
131        &self,
132        room_id: &RoomId,
133        receipt_type: ReceiptType,
134        thread: ReceiptThread,
135        event_id: &EventId,
136    ) -> Option<Vec<(OwnedUserId, Receipt)>> {
137        Some(
138            self.inner
139                .read()
140                .unwrap()
141                .room_event_receipts
142                .get(room_id)?
143                .get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned)))?
144                .get(event_id)?
145                .iter()
146                .map(|(key, value)| (key.clone(), value.clone()))
147                .collect(),
148        )
149    }
150}
151
152#[cfg_attr(target_family = "wasm", async_trait(?Send))]
153#[cfg_attr(not(target_family = "wasm"), async_trait)]
154impl StateStore for MemoryStore {
155    type Error = StoreError;
156
157    async fn close(&self) -> Result<(), Self::Error> {
158        Ok(())
159    }
160
161    async fn reopen(&self) -> Result<(), Self::Error> {
162        Ok(())
163    }
164
165    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
166        let inner = self.inner.read().unwrap();
167
168        Ok(match key {
169            StateStoreDataKey::SyncToken => {
170                inner.sync_token.clone().map(StateStoreDataValue::SyncToken)
171            }
172            StateStoreDataKey::SupportedVersions => {
173                inner.supported_versions.clone().map(StateStoreDataValue::SupportedVersions)
174            }
175            StateStoreDataKey::WellKnown => {
176                inner.well_known.clone().map(StateStoreDataValue::WellKnown)
177            }
178            StateStoreDataKey::Filter(filter_name) => {
179                inner.filters.get(filter_name).cloned().map(StateStoreDataValue::Filter)
180            }
181            StateStoreDataKey::UserAvatarUrl(user_id) => {
182                inner.user_avatar_url.get(user_id).cloned().map(StateStoreDataValue::UserAvatarUrl)
183            }
184            StateStoreDataKey::RecentlyVisitedRooms(user_id) => inner
185                .recently_visited_rooms
186                .get(user_id)
187                .cloned()
188                .map(StateStoreDataValue::RecentlyVisitedRooms),
189            StateStoreDataKey::UtdHookManagerData => {
190                inner.utd_hook_manager_data.clone().map(StateStoreDataValue::UtdHookManagerData)
191            }
192            StateStoreDataKey::OneTimeKeyAlreadyUploaded => inner
193                .one_time_key_uploaded_error
194                .then_some(StateStoreDataValue::OneTimeKeyAlreadyUploaded),
195            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
196                let key = (room_id.to_owned(), thread_root.map(ToOwned::to_owned));
197                inner.composer_drafts.get(&key).cloned().map(StateStoreDataValue::ComposerDraft)
198            }
199            StateStoreDataKey::SeenKnockRequests(room_id) => inner
200                .seen_knock_requests
201                .get(room_id)
202                .cloned()
203                .map(StateStoreDataValue::SeenKnockRequests),
204            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => inner
205                .thread_subscriptions_catchup_tokens
206                .clone()
207                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
208            StateStoreDataKey::HomeserverCapabilities => inner
209                .homeserver_capabilities
210                .clone()
211                .map(StateStoreDataValue::HomeserverCapabilities),
212        })
213    }
214
215    async fn set_kv_data(
216        &self,
217        key: StateStoreDataKey<'_>,
218        value: StateStoreDataValue,
219    ) -> Result<()> {
220        let mut inner = self.inner.write().unwrap();
221        match key {
222            StateStoreDataKey::SyncToken => {
223                inner.sync_token =
224                    Some(value.into_sync_token().expect("Session data not a sync token"))
225            }
226            StateStoreDataKey::Filter(filter_name) => {
227                inner.filters.insert(
228                    filter_name.to_owned(),
229                    value.into_filter().expect("Session data not a filter"),
230                );
231            }
232            StateStoreDataKey::UserAvatarUrl(user_id) => {
233                inner.user_avatar_url.insert(
234                    user_id.to_owned(),
235                    value.into_user_avatar_url().expect("Session data not a user avatar url"),
236                );
237            }
238            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
239                inner.recently_visited_rooms.insert(
240                    user_id.to_owned(),
241                    value
242                        .into_recently_visited_rooms()
243                        .expect("Session data not a list of recently visited rooms"),
244                );
245            }
246            StateStoreDataKey::UtdHookManagerData => {
247                inner.utd_hook_manager_data = Some(
248                    value
249                        .into_utd_hook_manager_data()
250                        .expect("Session data not the hook manager data"),
251                );
252            }
253            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
254                inner.one_time_key_uploaded_error = true;
255            }
256            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
257                inner.composer_drafts.insert(
258                    (room_id.to_owned(), thread_root.map(ToOwned::to_owned)),
259                    value.into_composer_draft().expect("Session data not a composer draft"),
260                );
261            }
262            StateStoreDataKey::SupportedVersions => {
263                inner.supported_versions = Some(
264                    value
265                        .into_supported_versions()
266                        .expect("Session data not containing supported versions"),
267                );
268            }
269            StateStoreDataKey::WellKnown => {
270                inner.well_known =
271                    Some(value.into_well_known().expect("Session data not containing well-known"));
272            }
273            StateStoreDataKey::SeenKnockRequests(room_id) => {
274                inner.seen_knock_requests.insert(
275                    room_id.to_owned(),
276                    value
277                        .into_seen_knock_requests()
278                        .expect("Session data is not a set of seen join request ids"),
279                );
280            }
281            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
282                inner.thread_subscriptions_catchup_tokens =
283                    Some(value.into_thread_subscriptions_catchup_tokens().expect(
284                        "Session data is not a list of thread subscription catchup tokens",
285                    ));
286            }
287            StateStoreDataKey::HomeserverCapabilities => {
288                inner.homeserver_capabilities = Some(
289                    value
290                        .into_homeserver_capabilities()
291                        .expect("Session data is not a homeserver capabilities"),
292                );
293            }
294        }
295
296        Ok(())
297    }
298
299    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
300        let mut inner = self.inner.write().unwrap();
301        match key {
302            StateStoreDataKey::SyncToken => inner.sync_token = None,
303            StateStoreDataKey::SupportedVersions => inner.supported_versions = None,
304            StateStoreDataKey::WellKnown => inner.well_known = None,
305            StateStoreDataKey::Filter(filter_name) => {
306                inner.filters.remove(filter_name);
307            }
308            StateStoreDataKey::UserAvatarUrl(user_id) => {
309                inner.user_avatar_url.remove(user_id);
310            }
311            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
312                inner.recently_visited_rooms.remove(user_id);
313            }
314            StateStoreDataKey::UtdHookManagerData => inner.utd_hook_manager_data = None,
315            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
316                inner.one_time_key_uploaded_error = false
317            }
318            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
319                let key = (room_id.to_owned(), thread_root.map(ToOwned::to_owned));
320                inner.composer_drafts.remove(&key);
321            }
322            StateStoreDataKey::SeenKnockRequests(room_id) => {
323                inner.seen_knock_requests.remove(room_id);
324            }
325            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
326                inner.thread_subscriptions_catchup_tokens = None;
327            }
328            StateStoreDataKey::HomeserverCapabilities => inner.homeserver_capabilities = None,
329        }
330        Ok(())
331    }
332
333    #[instrument(skip(self, changes))]
334    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
335        let now = Instant::now();
336
337        let mut inner = self.inner.write().unwrap();
338
339        if let Some(s) = &changes.sync_token {
340            inner.sync_token = Some(s.to_owned());
341        }
342
343        for (room, users) in &changes.profiles_to_delete {
344            let Some(room_profiles) = inner.profiles.get_mut(room) else {
345                continue;
346            };
347            for user in users {
348                room_profiles.remove(user);
349            }
350        }
351
352        for (room, users) in &changes.profiles {
353            for (user_id, profile) in users {
354                inner
355                    .profiles
356                    .entry(room.clone())
357                    .or_default()
358                    .insert(user_id.clone(), profile.clone());
359            }
360        }
361
362        for (room, map) in &changes.ambiguity_maps {
363            for (display_name, display_names) in map {
364                inner
365                    .display_names
366                    .entry(room.clone())
367                    .or_default()
368                    .insert(display_name.clone(), display_names.clone());
369            }
370        }
371
372        for (event_type, event) in &changes.account_data {
373            inner.account_data.insert(event_type.clone(), event.clone());
374        }
375
376        for (room, events) in &changes.room_account_data {
377            for (event_type, event) in events {
378                inner
379                    .room_account_data
380                    .entry(room.clone())
381                    .or_default()
382                    .insert(event_type.clone(), event.clone());
383            }
384        }
385
386        for (room, event_types) in &changes.state {
387            for (event_type, events) in event_types {
388                for (state_key, raw_event) in events {
389                    inner
390                        .room_state
391                        .entry(room.clone())
392                        .or_default()
393                        .entry(event_type.clone())
394                        .or_default()
395                        .insert(state_key.to_owned(), raw_event.clone());
396                    inner.stripped_room_state.remove(room);
397
398                    if *event_type == StateEventType::RoomMember {
399                        let event =
400                            match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
401                                Ok(ev) => ev,
402                                Err(e) => {
403                                    let event_id: Option<String> =
404                                        raw_event.get_field("event_id").ok().flatten();
405                                    debug!(event_id, "Failed to deserialize member event: {e}");
406                                    continue;
407                                }
408                            };
409
410                        inner.stripped_members.remove(room);
411
412                        inner
413                            .members
414                            .entry(room.clone())
415                            .or_default()
416                            .insert(event.state_key().to_owned(), event.membership().clone());
417                    }
418                }
419            }
420        }
421
422        for (room_id, info) in &changes.room_infos {
423            inner.room_info.insert(room_id.clone(), info.clone());
424        }
425
426        for (sender, event) in &changes.presence {
427            inner.presence.insert(sender.clone(), event.clone());
428        }
429
430        for (room, event_types) in &changes.stripped_state {
431            for (event_type, events) in event_types {
432                for (state_key, raw_event) in events {
433                    inner
434                        .stripped_room_state
435                        .entry(room.clone())
436                        .or_default()
437                        .entry(event_type.clone())
438                        .or_default()
439                        .insert(state_key.to_owned(), raw_event.clone());
440
441                    if *event_type == StateEventType::RoomMember {
442                        let event =
443                            match raw_event.deserialize_as_unchecked::<StrippedRoomMemberEvent>() {
444                                Ok(ev) => ev,
445                                Err(e) => {
446                                    let event_id: Option<String> =
447                                        raw_event.get_field("event_id").ok().flatten();
448                                    debug!(
449                                        event_id,
450                                        "Failed to deserialize stripped member event: {e}"
451                                    );
452                                    continue;
453                                }
454                            };
455
456                        inner
457                            .stripped_members
458                            .entry(room.clone())
459                            .or_default()
460                            .insert(event.state_key, event.content.membership.clone());
461                    }
462                }
463            }
464        }
465
466        for (room, content) in &changes.receipts {
467            for (event_id, receipts) in &content.0 {
468                for (receipt_type, receipts) in receipts {
469                    for (user_id, receipt) in receipts {
470                        let thread = receipt.thread.as_str().map(ToOwned::to_owned);
471                        // Add the receipt to the room user receipts
472                        if let Some((old_event, _)) = inner
473                            .room_user_receipts
474                            .entry(room.clone())
475                            .or_default()
476                            .entry((receipt_type.to_string(), thread.clone()))
477                            .or_default()
478                            .insert(user_id.clone(), (event_id.clone(), receipt.clone()))
479                        {
480                            // Remove the old receipt from the room event receipts
481                            if let Some(receipt_map) = inner.room_event_receipts.get_mut(room)
482                                && let Some(event_map) =
483                                    receipt_map.get_mut(&(receipt_type.to_string(), thread.clone()))
484                                && let Some(user_map) = event_map.get_mut(&old_event)
485                            {
486                                user_map.remove(user_id);
487                            }
488                        }
489
490                        // Add the receipt to the room event receipts
491                        inner
492                            .room_event_receipts
493                            .entry(room.clone())
494                            .or_default()
495                            .entry((receipt_type.to_string(), thread))
496                            .or_default()
497                            .entry(event_id.clone())
498                            .or_default()
499                            .insert(user_id.clone(), receipt.clone());
500                    }
501                }
502            }
503        }
504
505        let make_redaction_rules = |room_info: &HashMap<OwnedRoomId, RoomInfo>, room_id| {
506            room_info.get(room_id).map(|info| info.room_version_rules_or_default()).unwrap_or_else(|| {
507                warn!(
508                    ?room_id,
509                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
510                );
511                ROOM_VERSION_RULES_FALLBACK
512            }).redaction
513        };
514
515        let inner = &mut *inner;
516        for (room_id, redactions) in &changes.redactions {
517            let mut redaction_rules = None;
518
519            if let Some(room) = inner.room_state.get_mut(room_id) {
520                for ref_room_mu in room.values_mut() {
521                    for raw_evt in ref_room_mu.values_mut() {
522                        if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
523                            && let Some(redaction) = redactions.get(&event_id)
524                        {
525                            let redacted = redact(
526                                raw_evt.deserialize_as::<CanonicalJsonObject>()?,
527                                redaction_rules.get_or_insert_with(|| {
528                                    make_redaction_rules(&inner.room_info, room_id)
529                                }),
530                                Some(RedactedBecause::from_raw_event(redaction)?),
531                            )
532                            .map_err(StoreError::Redaction)?;
533                            *raw_evt = Raw::new(&redacted)?.cast_unchecked();
534                        }
535                    }
536                }
537            }
538        }
539
540        debug!("Saved changes in {:?}", now.elapsed());
541
542        Ok(())
543    }
544
545    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
546        Ok(self.inner.read().unwrap().presence.get(user_id).cloned())
547    }
548
549    async fn get_presence_events(
550        &self,
551        user_ids: &[OwnedUserId],
552    ) -> Result<Vec<Raw<PresenceEvent>>> {
553        let presence = &self.inner.read().unwrap().presence;
554        Ok(user_ids.iter().filter_map(|user_id| presence.get(user_id).cloned()).collect())
555    }
556
557    async fn get_state_event(
558        &self,
559        room_id: &RoomId,
560        event_type: StateEventType,
561        state_key: &str,
562    ) -> Result<Option<RawAnySyncOrStrippedState>> {
563        Ok(self
564            .get_state_events_for_keys(room_id, event_type, &[state_key])
565            .await?
566            .into_iter()
567            .next())
568    }
569
570    async fn get_state_events(
571        &self,
572        room_id: &RoomId,
573        event_type: StateEventType,
574    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
575        fn get_events<T>(
576            state_map: &HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<T>>>>,
577            room_id: &RoomId,
578            event_type: &StateEventType,
579            to_enum: fn(Raw<T>) -> RawAnySyncOrStrippedState,
580        ) -> Option<Vec<RawAnySyncOrStrippedState>> {
581            let state_events = state_map.get(room_id)?.get(event_type)?;
582            Some(state_events.values().cloned().map(to_enum).collect())
583        }
584
585        let inner = self.inner.read().unwrap();
586        Ok(get_events(
587            &inner.stripped_room_state,
588            room_id,
589            &event_type,
590            RawAnySyncOrStrippedState::Stripped,
591        )
592        .or_else(|| {
593            get_events(&inner.room_state, room_id, &event_type, RawAnySyncOrStrippedState::Sync)
594        })
595        .unwrap_or_default())
596    }
597
598    async fn get_state_events_for_keys(
599        &self,
600        room_id: &RoomId,
601        event_type: StateEventType,
602        state_keys: &[&str],
603    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
604        let inner = self.inner.read().unwrap();
605
606        if let Some(stripped_state_events) =
607            inner.stripped_room_state.get(room_id).and_then(|events| events.get(&event_type))
608        {
609            Ok(state_keys
610                .iter()
611                .filter_map(|k| {
612                    stripped_state_events
613                        .get(*k)
614                        .map(|e| RawAnySyncOrStrippedState::Stripped(e.clone()))
615                })
616                .collect())
617        } else if let Some(sync_state_events) =
618            inner.room_state.get(room_id).and_then(|events| events.get(&event_type))
619        {
620            Ok(state_keys
621                .iter()
622                .filter_map(|k| {
623                    sync_state_events.get(*k).map(|e| RawAnySyncOrStrippedState::Sync(e.clone()))
624                })
625                .collect())
626        } else {
627            Ok(Vec::new())
628        }
629    }
630
631    async fn get_profile(
632        &self,
633        room_id: &RoomId,
634        user_id: &UserId,
635    ) -> Result<Option<MinimalRoomMemberEvent>> {
636        Ok(self
637            .inner
638            .read()
639            .unwrap()
640            .profiles
641            .get(room_id)
642            .and_then(|room_profiles| room_profiles.get(user_id))
643            .cloned())
644    }
645
646    async fn get_profiles<'a>(
647        &self,
648        room_id: &RoomId,
649        user_ids: &'a [OwnedUserId],
650    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
651        if user_ids.is_empty() {
652            return Ok(BTreeMap::new());
653        }
654
655        let profiles = &self.inner.read().unwrap().profiles;
656        let Some(room_profiles) = profiles.get(room_id) else {
657            return Ok(BTreeMap::new());
658        };
659
660        Ok(user_ids
661            .iter()
662            .filter_map(|user_id| room_profiles.get(user_id).map(|p| (&**user_id, p.clone())))
663            .collect())
664    }
665
666    #[instrument(skip(self, memberships))]
667    async fn get_user_ids(
668        &self,
669        room_id: &RoomId,
670        memberships: RoomMemberships,
671    ) -> Result<Vec<OwnedUserId>> {
672        /// Get the user IDs for the given room with the given memberships and
673        /// stripped state.
674        ///
675        /// If `memberships` is empty, returns all user IDs in the room with the
676        /// given stripped state.
677        fn get_user_ids_inner(
678            members: &HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
679            room_id: &RoomId,
680            memberships: RoomMemberships,
681        ) -> Vec<OwnedUserId> {
682            members
683                .get(room_id)
684                .map(|members| {
685                    members
686                        .iter()
687                        .filter_map(|(user_id, membership)| {
688                            memberships.matches(membership).then_some(user_id)
689                        })
690                        .cloned()
691                        .collect()
692                })
693                .unwrap_or_default()
694        }
695        let inner = self.inner.read().unwrap();
696        let v = get_user_ids_inner(&inner.stripped_members, room_id, memberships);
697        if !v.is_empty() {
698            return Ok(v);
699        }
700        Ok(get_user_ids_inner(&inner.members, room_id, memberships))
701    }
702
703    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
704        let memory_store_inner = self.inner.read().unwrap();
705        let room_infos = &memory_store_inner.room_info;
706
707        Ok(match room_load_settings {
708            RoomLoadSettings::All => room_infos.values().cloned().collect(),
709
710            RoomLoadSettings::One(room_id) => match room_infos.get(room_id) {
711                Some(room_info) => vec![room_info.clone()],
712                None => vec![],
713            },
714        })
715    }
716
717    async fn get_users_with_display_name(
718        &self,
719        room_id: &RoomId,
720        display_name: &DisplayName,
721    ) -> Result<BTreeSet<OwnedUserId>> {
722        Ok(self
723            .inner
724            .read()
725            .unwrap()
726            .display_names
727            .get(room_id)
728            .and_then(|room_names| room_names.get(display_name).cloned())
729            .unwrap_or_default())
730    }
731
732    async fn get_users_with_display_names<'a>(
733        &self,
734        room_id: &RoomId,
735        display_names: &'a [DisplayName],
736    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
737        if display_names.is_empty() {
738            return Ok(HashMap::new());
739        }
740
741        let inner = self.inner.read().unwrap();
742        let Some(room_names) = inner.display_names.get(room_id) else {
743            return Ok(HashMap::new());
744        };
745
746        Ok(display_names.iter().filter_map(|n| room_names.get(n).map(|d| (n, d.clone()))).collect())
747    }
748
749    async fn get_account_data_event(
750        &self,
751        event_type: GlobalAccountDataEventType,
752    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
753        Ok(self.inner.read().unwrap().account_data.get(&event_type).cloned())
754    }
755
756    async fn get_room_account_data_event(
757        &self,
758        room_id: &RoomId,
759        event_type: RoomAccountDataEventType,
760    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
761        Ok(self
762            .inner
763            .read()
764            .unwrap()
765            .room_account_data
766            .get(room_id)
767            .and_then(|m| m.get(&event_type))
768            .cloned())
769    }
770
771    async fn get_user_room_receipt_event(
772        &self,
773        room_id: &RoomId,
774        receipt_type: ReceiptType,
775        thread: ReceiptThread,
776        user_id: &UserId,
777    ) -> Result<Option<(OwnedEventId, Receipt)>> {
778        Ok(self.get_user_room_receipt_event_impl(room_id, receipt_type, thread, user_id))
779    }
780
781    async fn get_event_room_receipt_events(
782        &self,
783        room_id: &RoomId,
784        receipt_type: ReceiptType,
785        thread: ReceiptThread,
786        event_id: &EventId,
787    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
788        Ok(self
789            .get_event_room_receipt_events_impl(room_id, receipt_type, thread, event_id)
790            .unwrap_or_default())
791    }
792
793    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794        Ok(self.inner.read().unwrap().custom.get(key).cloned())
795    }
796
797    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
798        Ok(self.inner.write().unwrap().custom.insert(key.to_vec(), value))
799    }
800
801    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
802        Ok(self.inner.write().unwrap().custom.remove(key))
803    }
804
805    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
806        let mut inner = self.inner.write().unwrap();
807
808        inner.profiles.remove(room_id);
809        inner.display_names.remove(room_id);
810        inner.members.remove(room_id);
811        inner.room_info.remove(room_id);
812        inner.room_state.remove(room_id);
813        inner.room_account_data.remove(room_id);
814        inner.stripped_room_state.remove(room_id);
815        inner.stripped_members.remove(room_id);
816        inner.room_user_receipts.remove(room_id);
817        inner.room_event_receipts.remove(room_id);
818        inner.send_queue_events.remove(room_id);
819        inner.dependent_send_queue_events.remove(room_id);
820        inner.thread_subscriptions.remove(room_id);
821
822        Ok(())
823    }
824
825    async fn save_send_queue_request(
826        &self,
827        room_id: &RoomId,
828        transaction_id: OwnedTransactionId,
829        created_at: MilliSecondsSinceUnixEpoch,
830        kind: QueuedRequestKind,
831        priority: usize,
832    ) -> Result<(), Self::Error> {
833        self.inner
834            .write()
835            .unwrap()
836            .send_queue_events
837            .entry(room_id.to_owned())
838            .or_default()
839            .push(QueuedRequest { kind, transaction_id, error: None, priority, created_at });
840        Ok(())
841    }
842
843    async fn update_send_queue_request(
844        &self,
845        room_id: &RoomId,
846        transaction_id: &TransactionId,
847        kind: QueuedRequestKind,
848    ) -> Result<bool, Self::Error> {
849        if let Some(entry) = self
850            .inner
851            .write()
852            .unwrap()
853            .send_queue_events
854            .entry(room_id.to_owned())
855            .or_default()
856            .iter_mut()
857            .find(|item| item.transaction_id == transaction_id)
858        {
859            entry.kind = kind;
860            entry.error = None;
861            Ok(true)
862        } else {
863            Ok(false)
864        }
865    }
866
867    async fn remove_send_queue_request(
868        &self,
869        room_id: &RoomId,
870        transaction_id: &TransactionId,
871    ) -> Result<bool, Self::Error> {
872        let mut inner = self.inner.write().unwrap();
873        let q = &mut inner.send_queue_events;
874
875        let entry = q.get_mut(room_id);
876        if let Some(entry) = entry {
877            // Find the event by id in its room queue, and remove it if present.
878            if let Some(pos) = entry.iter().position(|item| item.transaction_id == transaction_id) {
879                entry.remove(pos);
880                // And if this was the last event before removal, remove the entire room entry.
881                if entry.is_empty() {
882                    q.remove(room_id);
883                }
884                return Ok(true);
885            }
886        }
887
888        Ok(false)
889    }
890
891    async fn load_send_queue_requests(
892        &self,
893        room_id: &RoomId,
894    ) -> Result<Vec<QueuedRequest>, Self::Error> {
895        let mut ret = self
896            .inner
897            .write()
898            .unwrap()
899            .send_queue_events
900            .entry(room_id.to_owned())
901            .or_default()
902            .clone();
903        // Inverted order of priority, use stable sort to keep insertion order.
904        ret.sort_by_key(|item| Reverse(item.priority));
905        Ok(ret)
906    }
907
908    async fn update_send_queue_request_status(
909        &self,
910        room_id: &RoomId,
911        transaction_id: &TransactionId,
912        error: Option<QueueWedgeError>,
913    ) -> Result<(), Self::Error> {
914        if let Some(entry) = self
915            .inner
916            .write()
917            .unwrap()
918            .send_queue_events
919            .entry(room_id.to_owned())
920            .or_default()
921            .iter_mut()
922            .find(|item| item.transaction_id == transaction_id)
923        {
924            entry.error = error;
925        }
926        Ok(())
927    }
928
929    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
930        Ok(self.inner.read().unwrap().send_queue_events.keys().cloned().collect())
931    }
932
933    async fn save_dependent_queued_request(
934        &self,
935        room: &RoomId,
936        parent_transaction_id: &TransactionId,
937        own_transaction_id: ChildTransactionId,
938        created_at: MilliSecondsSinceUnixEpoch,
939        content: DependentQueuedRequestKind,
940    ) -> Result<(), Self::Error> {
941        self.inner
942            .write()
943            .unwrap()
944            .dependent_send_queue_events
945            .entry(room.to_owned())
946            .or_default()
947            .push(DependentQueuedRequest {
948                kind: content,
949                parent_transaction_id: parent_transaction_id.to_owned(),
950                own_transaction_id,
951                parent_key: None,
952                created_at,
953            });
954        Ok(())
955    }
956
957    async fn mark_dependent_queued_requests_as_ready(
958        &self,
959        room: &RoomId,
960        parent_txn_id: &TransactionId,
961        sent_parent_key: SentRequestKey,
962    ) -> Result<usize, Self::Error> {
963        let mut inner = self.inner.write().unwrap();
964        let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
965        let mut num_updated = 0;
966        for d in dependents.iter_mut().filter(|item| item.parent_transaction_id == parent_txn_id) {
967            d.parent_key = Some(sent_parent_key.clone());
968            num_updated += 1;
969        }
970        Ok(num_updated)
971    }
972
973    async fn update_dependent_queued_request(
974        &self,
975        room: &RoomId,
976        own_transaction_id: &ChildTransactionId,
977        new_content: DependentQueuedRequestKind,
978    ) -> Result<bool, Self::Error> {
979        let mut inner = self.inner.write().unwrap();
980        let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
981        for d in dependents.iter_mut() {
982            if d.own_transaction_id == *own_transaction_id {
983                d.kind = new_content;
984                return Ok(true);
985            }
986        }
987        Ok(false)
988    }
989
990    async fn remove_dependent_queued_request(
991        &self,
992        room: &RoomId,
993        txn_id: &ChildTransactionId,
994    ) -> Result<bool, Self::Error> {
995        let mut inner = self.inner.write().unwrap();
996        let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
997        if let Some(pos) = dependents.iter().position(|item| item.own_transaction_id == *txn_id) {
998            dependents.remove(pos);
999            Ok(true)
1000        } else {
1001            Ok(false)
1002        }
1003    }
1004
1005    async fn load_dependent_queued_requests(
1006        &self,
1007        room: &RoomId,
1008    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1009        Ok(self
1010            .inner
1011            .read()
1012            .unwrap()
1013            .dependent_send_queue_events
1014            .get(room)
1015            .cloned()
1016            .unwrap_or_default())
1017    }
1018
1019    async fn upsert_thread_subscriptions(
1020        &self,
1021        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1022    ) -> Result<(), Self::Error> {
1023        let mut inner = self.inner.write().unwrap();
1024
1025        for (room_id, thread_id, mut new) in updates {
1026            let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default();
1027
1028            if let Some(previous) = room_subs.get(thread_id) {
1029                if *previous == new {
1030                    continue;
1031                }
1032                if !compare_thread_subscription_bump_stamps(
1033                    previous.bump_stamp,
1034                    &mut new.bump_stamp,
1035                ) {
1036                    continue;
1037                }
1038            }
1039
1040            room_subs.insert(thread_id.to_owned(), new);
1041        }
1042
1043        Ok(())
1044    }
1045
1046    async fn load_thread_subscription(
1047        &self,
1048        room: &RoomId,
1049        thread_id: &EventId,
1050    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1051        let inner = self.inner.read().unwrap();
1052        Ok(inner
1053            .thread_subscriptions
1054            .get(room)
1055            .and_then(|subscriptions| subscriptions.get(thread_id))
1056            .copied())
1057    }
1058
1059    async fn remove_thread_subscription(
1060        &self,
1061        room: &RoomId,
1062        thread_id: &EventId,
1063    ) -> Result<(), Self::Error> {
1064        let mut inner = self.inner.write().unwrap();
1065
1066        let Some(room_subs) = inner.thread_subscriptions.get_mut(room) else {
1067            return Ok(());
1068        };
1069
1070        room_subs.remove(thread_id);
1071
1072        if room_subs.is_empty() {
1073            // If there are no more subscriptions for this room, remove the room entry.
1074            inner.thread_subscriptions.remove(room);
1075        }
1076
1077        Ok(())
1078    }
1079
1080    async fn optimize(&self) -> Result<(), Self::Error> {
1081        Ok(())
1082    }
1083
1084    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1085        Ok(None)
1086    }
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091    use super::{MemoryStore, Result, StateStore};
1092
1093    async fn get_store() -> Result<impl StateStore> {
1094        Ok(MemoryStore::new())
1095    }
1096
1097    statestore_integration_tests!();
1098}