1use std::{
16 cmp::Reverse,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 sync::RwLock,
19};
20
21use async_trait::async_trait;
22use growable_bloom_filter::GrowableBloom;
23use matrix_sdk_common::{ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, ttl::TtlValue};
24use ruma::{
25 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
26 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
27 api::client::discovery::get_capabilities::v3::Capabilities,
28 canonical_json::{RedactedBecause, redact},
29 events::{
30 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
31 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
32 presence::PresenceEvent,
33 receipt::{Receipt, ReceiptThread, ReceiptType},
34 room::member::{MembershipState, StrippedRoomMemberEvent, SyncRoomMemberEvent},
35 },
36 serde::Raw,
37 time::Instant,
38};
39use tracing::{debug, instrument, warn};
40
41use super::{
42 DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequestKind, Result, RoomInfo,
43 RoomLoadSettings, StateChanges, StateStore, StoreError, SupportedVersionsResponse,
44 WellKnownResponse,
45 send_queue::{ChildTransactionId, QueuedRequest, SentRequestKey},
46 traits::ComposerDraft,
47};
48use crate::{
49 MinimalRoomMemberEvent, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
50 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
51 store::{
52 QueueWedgeError, StoredThreadSubscription,
53 traits::{ThreadSubscriptionCatchupToken, compare_thread_subscription_bump_stamps},
54 },
55};
56
57#[derive(Debug, Default)]
58#[allow(clippy::type_complexity)]
59struct MemoryStoreInner {
60 recently_visited_rooms: HashMap<OwnedUserId, Vec<OwnedRoomId>>,
61 composer_drafts: HashMap<(OwnedRoomId, Option<OwnedEventId>), ComposerDraft>,
62 user_avatar_url: HashMap<OwnedUserId, OwnedMxcUri>,
63 sync_token: Option<String>,
64 supported_versions: Option<TtlValue<SupportedVersionsResponse>>,
65 well_known: Option<TtlValue<Option<WellKnownResponse>>>,
66 filters: HashMap<String, String>,
67 utd_hook_manager_data: Option<GrowableBloom>,
68 one_time_key_uploaded_error: bool,
69 account_data: HashMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
70 profiles: HashMap<OwnedRoomId, HashMap<OwnedUserId, MinimalRoomMemberEvent>>,
71 display_names: HashMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
72 members: HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
73 room_info: HashMap<OwnedRoomId, RoomInfo>,
74 room_state:
75 HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<AnySyncStateEvent>>>>,
76 room_account_data:
77 HashMap<OwnedRoomId, HashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
78 stripped_room_state:
79 HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<AnyStrippedStateEvent>>>>,
80 stripped_members: HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
81 presence: HashMap<OwnedUserId, Raw<PresenceEvent>>,
82 room_user_receipts: HashMap<
83 OwnedRoomId,
84 HashMap<(String, Option<String>), HashMap<OwnedUserId, (OwnedEventId, Receipt)>>,
85 >,
86 room_event_receipts: HashMap<
87 OwnedRoomId,
88 HashMap<(String, Option<String>), HashMap<OwnedEventId, HashMap<OwnedUserId, Receipt>>>,
89 >,
90 custom: HashMap<Vec<u8>, Vec<u8>>,
91 send_queue_events: BTreeMap<OwnedRoomId, Vec<QueuedRequest>>,
92 dependent_send_queue_events: BTreeMap<OwnedRoomId, Vec<DependentQueuedRequest>>,
93 seen_knock_requests: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, OwnedUserId>>,
94 thread_subscriptions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, StoredThreadSubscription>>,
95 thread_subscriptions_catchup_tokens: Option<Vec<ThreadSubscriptionCatchupToken>>,
96 homeserver_capabilities: Option<TtlValue<Capabilities>>,
97}
98
99#[derive(Debug, Default)]
103pub struct MemoryStore {
104 inner: RwLock<MemoryStoreInner>,
105}
106
107impl MemoryStore {
108 pub fn new() -> Self {
110 Self::default()
111 }
112
113 fn get_user_room_receipt_event_impl(
114 &self,
115 room_id: &RoomId,
116 receipt_type: ReceiptType,
117 thread: ReceiptThread,
118 user_id: &UserId,
119 ) -> Option<(OwnedEventId, Receipt)> {
120 self.inner
121 .read()
122 .unwrap()
123 .room_user_receipts
124 .get(room_id)?
125 .get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned)))?
126 .get(user_id)
127 .cloned()
128 }
129
130 fn get_event_room_receipt_events_impl(
131 &self,
132 room_id: &RoomId,
133 receipt_type: ReceiptType,
134 thread: ReceiptThread,
135 event_id: &EventId,
136 ) -> Option<Vec<(OwnedUserId, Receipt)>> {
137 Some(
138 self.inner
139 .read()
140 .unwrap()
141 .room_event_receipts
142 .get(room_id)?
143 .get(&(receipt_type.to_string(), thread.as_str().map(ToOwned::to_owned)))?
144 .get(event_id)?
145 .iter()
146 .map(|(key, value)| (key.clone(), value.clone()))
147 .collect(),
148 )
149 }
150}
151
152#[cfg_attr(target_family = "wasm", async_trait(?Send))]
153#[cfg_attr(not(target_family = "wasm"), async_trait)]
154impl StateStore for MemoryStore {
155 type Error = StoreError;
156
157 async fn close(&self) -> Result<(), Self::Error> {
158 Ok(())
159 }
160
161 async fn reopen(&self) -> Result<(), Self::Error> {
162 Ok(())
163 }
164
165 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
166 let inner = self.inner.read().unwrap();
167
168 Ok(match key {
169 StateStoreDataKey::SyncToken => {
170 inner.sync_token.clone().map(StateStoreDataValue::SyncToken)
171 }
172 StateStoreDataKey::SupportedVersions => {
173 inner.supported_versions.clone().map(StateStoreDataValue::SupportedVersions)
174 }
175 StateStoreDataKey::WellKnown => {
176 inner.well_known.clone().map(StateStoreDataValue::WellKnown)
177 }
178 StateStoreDataKey::Filter(filter_name) => {
179 inner.filters.get(filter_name).cloned().map(StateStoreDataValue::Filter)
180 }
181 StateStoreDataKey::UserAvatarUrl(user_id) => {
182 inner.user_avatar_url.get(user_id).cloned().map(StateStoreDataValue::UserAvatarUrl)
183 }
184 StateStoreDataKey::RecentlyVisitedRooms(user_id) => inner
185 .recently_visited_rooms
186 .get(user_id)
187 .cloned()
188 .map(StateStoreDataValue::RecentlyVisitedRooms),
189 StateStoreDataKey::UtdHookManagerData => {
190 inner.utd_hook_manager_data.clone().map(StateStoreDataValue::UtdHookManagerData)
191 }
192 StateStoreDataKey::OneTimeKeyAlreadyUploaded => inner
193 .one_time_key_uploaded_error
194 .then_some(StateStoreDataValue::OneTimeKeyAlreadyUploaded),
195 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
196 let key = (room_id.to_owned(), thread_root.map(ToOwned::to_owned));
197 inner.composer_drafts.get(&key).cloned().map(StateStoreDataValue::ComposerDraft)
198 }
199 StateStoreDataKey::SeenKnockRequests(room_id) => inner
200 .seen_knock_requests
201 .get(room_id)
202 .cloned()
203 .map(StateStoreDataValue::SeenKnockRequests),
204 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => inner
205 .thread_subscriptions_catchup_tokens
206 .clone()
207 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
208 StateStoreDataKey::HomeserverCapabilities => inner
209 .homeserver_capabilities
210 .clone()
211 .map(StateStoreDataValue::HomeserverCapabilities),
212 })
213 }
214
215 async fn set_kv_data(
216 &self,
217 key: StateStoreDataKey<'_>,
218 value: StateStoreDataValue,
219 ) -> Result<()> {
220 let mut inner = self.inner.write().unwrap();
221 match key {
222 StateStoreDataKey::SyncToken => {
223 inner.sync_token =
224 Some(value.into_sync_token().expect("Session data not a sync token"))
225 }
226 StateStoreDataKey::Filter(filter_name) => {
227 inner.filters.insert(
228 filter_name.to_owned(),
229 value.into_filter().expect("Session data not a filter"),
230 );
231 }
232 StateStoreDataKey::UserAvatarUrl(user_id) => {
233 inner.user_avatar_url.insert(
234 user_id.to_owned(),
235 value.into_user_avatar_url().expect("Session data not a user avatar url"),
236 );
237 }
238 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
239 inner.recently_visited_rooms.insert(
240 user_id.to_owned(),
241 value
242 .into_recently_visited_rooms()
243 .expect("Session data not a list of recently visited rooms"),
244 );
245 }
246 StateStoreDataKey::UtdHookManagerData => {
247 inner.utd_hook_manager_data = Some(
248 value
249 .into_utd_hook_manager_data()
250 .expect("Session data not the hook manager data"),
251 );
252 }
253 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
254 inner.one_time_key_uploaded_error = true;
255 }
256 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
257 inner.composer_drafts.insert(
258 (room_id.to_owned(), thread_root.map(ToOwned::to_owned)),
259 value.into_composer_draft().expect("Session data not a composer draft"),
260 );
261 }
262 StateStoreDataKey::SupportedVersions => {
263 inner.supported_versions = Some(
264 value
265 .into_supported_versions()
266 .expect("Session data not containing supported versions"),
267 );
268 }
269 StateStoreDataKey::WellKnown => {
270 inner.well_known =
271 Some(value.into_well_known().expect("Session data not containing well-known"));
272 }
273 StateStoreDataKey::SeenKnockRequests(room_id) => {
274 inner.seen_knock_requests.insert(
275 room_id.to_owned(),
276 value
277 .into_seen_knock_requests()
278 .expect("Session data is not a set of seen join request ids"),
279 );
280 }
281 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
282 inner.thread_subscriptions_catchup_tokens =
283 Some(value.into_thread_subscriptions_catchup_tokens().expect(
284 "Session data is not a list of thread subscription catchup tokens",
285 ));
286 }
287 StateStoreDataKey::HomeserverCapabilities => {
288 inner.homeserver_capabilities = Some(
289 value
290 .into_homeserver_capabilities()
291 .expect("Session data is not a homeserver capabilities"),
292 );
293 }
294 }
295
296 Ok(())
297 }
298
299 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
300 let mut inner = self.inner.write().unwrap();
301 match key {
302 StateStoreDataKey::SyncToken => inner.sync_token = None,
303 StateStoreDataKey::SupportedVersions => inner.supported_versions = None,
304 StateStoreDataKey::WellKnown => inner.well_known = None,
305 StateStoreDataKey::Filter(filter_name) => {
306 inner.filters.remove(filter_name);
307 }
308 StateStoreDataKey::UserAvatarUrl(user_id) => {
309 inner.user_avatar_url.remove(user_id);
310 }
311 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
312 inner.recently_visited_rooms.remove(user_id);
313 }
314 StateStoreDataKey::UtdHookManagerData => inner.utd_hook_manager_data = None,
315 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
316 inner.one_time_key_uploaded_error = false
317 }
318 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
319 let key = (room_id.to_owned(), thread_root.map(ToOwned::to_owned));
320 inner.composer_drafts.remove(&key);
321 }
322 StateStoreDataKey::SeenKnockRequests(room_id) => {
323 inner.seen_knock_requests.remove(room_id);
324 }
325 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
326 inner.thread_subscriptions_catchup_tokens = None;
327 }
328 StateStoreDataKey::HomeserverCapabilities => inner.homeserver_capabilities = None,
329 }
330 Ok(())
331 }
332
333 #[instrument(skip(self, changes))]
334 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
335 let now = Instant::now();
336
337 let mut inner = self.inner.write().unwrap();
338
339 if let Some(s) = &changes.sync_token {
340 inner.sync_token = Some(s.to_owned());
341 }
342
343 for (room, users) in &changes.profiles_to_delete {
344 let Some(room_profiles) = inner.profiles.get_mut(room) else {
345 continue;
346 };
347 for user in users {
348 room_profiles.remove(user);
349 }
350 }
351
352 for (room, users) in &changes.profiles {
353 for (user_id, profile) in users {
354 inner
355 .profiles
356 .entry(room.clone())
357 .or_default()
358 .insert(user_id.clone(), profile.clone());
359 }
360 }
361
362 for (room, map) in &changes.ambiguity_maps {
363 for (display_name, display_names) in map {
364 inner
365 .display_names
366 .entry(room.clone())
367 .or_default()
368 .insert(display_name.clone(), display_names.clone());
369 }
370 }
371
372 for (event_type, event) in &changes.account_data {
373 inner.account_data.insert(event_type.clone(), event.clone());
374 }
375
376 for (room, events) in &changes.room_account_data {
377 for (event_type, event) in events {
378 inner
379 .room_account_data
380 .entry(room.clone())
381 .or_default()
382 .insert(event_type.clone(), event.clone());
383 }
384 }
385
386 for (room, event_types) in &changes.state {
387 for (event_type, events) in event_types {
388 for (state_key, raw_event) in events {
389 inner
390 .room_state
391 .entry(room.clone())
392 .or_default()
393 .entry(event_type.clone())
394 .or_default()
395 .insert(state_key.to_owned(), raw_event.clone());
396 inner.stripped_room_state.remove(room);
397
398 if *event_type == StateEventType::RoomMember {
399 let event =
400 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
401 Ok(ev) => ev,
402 Err(e) => {
403 let event_id: Option<String> =
404 raw_event.get_field("event_id").ok().flatten();
405 debug!(event_id, "Failed to deserialize member event: {e}");
406 continue;
407 }
408 };
409
410 inner.stripped_members.remove(room);
411
412 inner
413 .members
414 .entry(room.clone())
415 .or_default()
416 .insert(event.state_key().to_owned(), event.membership().clone());
417 }
418 }
419 }
420 }
421
422 for (room_id, info) in &changes.room_infos {
423 inner.room_info.insert(room_id.clone(), info.clone());
424 }
425
426 for (sender, event) in &changes.presence {
427 inner.presence.insert(sender.clone(), event.clone());
428 }
429
430 for (room, event_types) in &changes.stripped_state {
431 for (event_type, events) in event_types {
432 for (state_key, raw_event) in events {
433 inner
434 .stripped_room_state
435 .entry(room.clone())
436 .or_default()
437 .entry(event_type.clone())
438 .or_default()
439 .insert(state_key.to_owned(), raw_event.clone());
440
441 if *event_type == StateEventType::RoomMember {
442 let event =
443 match raw_event.deserialize_as_unchecked::<StrippedRoomMemberEvent>() {
444 Ok(ev) => ev,
445 Err(e) => {
446 let event_id: Option<String> =
447 raw_event.get_field("event_id").ok().flatten();
448 debug!(
449 event_id,
450 "Failed to deserialize stripped member event: {e}"
451 );
452 continue;
453 }
454 };
455
456 inner
457 .stripped_members
458 .entry(room.clone())
459 .or_default()
460 .insert(event.state_key, event.content.membership.clone());
461 }
462 }
463 }
464 }
465
466 for (room, content) in &changes.receipts {
467 for (event_id, receipts) in &content.0 {
468 for (receipt_type, receipts) in receipts {
469 for (user_id, receipt) in receipts {
470 let thread = receipt.thread.as_str().map(ToOwned::to_owned);
471 if let Some((old_event, _)) = inner
473 .room_user_receipts
474 .entry(room.clone())
475 .or_default()
476 .entry((receipt_type.to_string(), thread.clone()))
477 .or_default()
478 .insert(user_id.clone(), (event_id.clone(), receipt.clone()))
479 {
480 if let Some(receipt_map) = inner.room_event_receipts.get_mut(room)
482 && let Some(event_map) =
483 receipt_map.get_mut(&(receipt_type.to_string(), thread.clone()))
484 && let Some(user_map) = event_map.get_mut(&old_event)
485 {
486 user_map.remove(user_id);
487 }
488 }
489
490 inner
492 .room_event_receipts
493 .entry(room.clone())
494 .or_default()
495 .entry((receipt_type.to_string(), thread))
496 .or_default()
497 .entry(event_id.clone())
498 .or_default()
499 .insert(user_id.clone(), receipt.clone());
500 }
501 }
502 }
503 }
504
505 let make_redaction_rules = |room_info: &HashMap<OwnedRoomId, RoomInfo>, room_id| {
506 room_info.get(room_id).map(|info| info.room_version_rules_or_default()).unwrap_or_else(|| {
507 warn!(
508 ?room_id,
509 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
510 );
511 ROOM_VERSION_RULES_FALLBACK
512 }).redaction
513 };
514
515 let inner = &mut *inner;
516 for (room_id, redactions) in &changes.redactions {
517 let mut redaction_rules = None;
518
519 if let Some(room) = inner.room_state.get_mut(room_id) {
520 for ref_room_mu in room.values_mut() {
521 for raw_evt in ref_room_mu.values_mut() {
522 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
523 && let Some(redaction) = redactions.get(&event_id)
524 {
525 let redacted = redact(
526 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
527 redaction_rules.get_or_insert_with(|| {
528 make_redaction_rules(&inner.room_info, room_id)
529 }),
530 Some(RedactedBecause::from_raw_event(redaction)?),
531 )
532 .map_err(StoreError::Redaction)?;
533 *raw_evt = Raw::new(&redacted)?.cast_unchecked();
534 }
535 }
536 }
537 }
538 }
539
540 debug!("Saved changes in {:?}", now.elapsed());
541
542 Ok(())
543 }
544
545 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
546 Ok(self.inner.read().unwrap().presence.get(user_id).cloned())
547 }
548
549 async fn get_presence_events(
550 &self,
551 user_ids: &[OwnedUserId],
552 ) -> Result<Vec<Raw<PresenceEvent>>> {
553 let presence = &self.inner.read().unwrap().presence;
554 Ok(user_ids.iter().filter_map(|user_id| presence.get(user_id).cloned()).collect())
555 }
556
557 async fn get_state_event(
558 &self,
559 room_id: &RoomId,
560 event_type: StateEventType,
561 state_key: &str,
562 ) -> Result<Option<RawAnySyncOrStrippedState>> {
563 Ok(self
564 .get_state_events_for_keys(room_id, event_type, &[state_key])
565 .await?
566 .into_iter()
567 .next())
568 }
569
570 async fn get_state_events(
571 &self,
572 room_id: &RoomId,
573 event_type: StateEventType,
574 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
575 fn get_events<T>(
576 state_map: &HashMap<OwnedRoomId, HashMap<StateEventType, HashMap<String, Raw<T>>>>,
577 room_id: &RoomId,
578 event_type: &StateEventType,
579 to_enum: fn(Raw<T>) -> RawAnySyncOrStrippedState,
580 ) -> Option<Vec<RawAnySyncOrStrippedState>> {
581 let state_events = state_map.get(room_id)?.get(event_type)?;
582 Some(state_events.values().cloned().map(to_enum).collect())
583 }
584
585 let inner = self.inner.read().unwrap();
586 Ok(get_events(
587 &inner.stripped_room_state,
588 room_id,
589 &event_type,
590 RawAnySyncOrStrippedState::Stripped,
591 )
592 .or_else(|| {
593 get_events(&inner.room_state, room_id, &event_type, RawAnySyncOrStrippedState::Sync)
594 })
595 .unwrap_or_default())
596 }
597
598 async fn get_state_events_for_keys(
599 &self,
600 room_id: &RoomId,
601 event_type: StateEventType,
602 state_keys: &[&str],
603 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
604 let inner = self.inner.read().unwrap();
605
606 if let Some(stripped_state_events) =
607 inner.stripped_room_state.get(room_id).and_then(|events| events.get(&event_type))
608 {
609 Ok(state_keys
610 .iter()
611 .filter_map(|k| {
612 stripped_state_events
613 .get(*k)
614 .map(|e| RawAnySyncOrStrippedState::Stripped(e.clone()))
615 })
616 .collect())
617 } else if let Some(sync_state_events) =
618 inner.room_state.get(room_id).and_then(|events| events.get(&event_type))
619 {
620 Ok(state_keys
621 .iter()
622 .filter_map(|k| {
623 sync_state_events.get(*k).map(|e| RawAnySyncOrStrippedState::Sync(e.clone()))
624 })
625 .collect())
626 } else {
627 Ok(Vec::new())
628 }
629 }
630
631 async fn get_profile(
632 &self,
633 room_id: &RoomId,
634 user_id: &UserId,
635 ) -> Result<Option<MinimalRoomMemberEvent>> {
636 Ok(self
637 .inner
638 .read()
639 .unwrap()
640 .profiles
641 .get(room_id)
642 .and_then(|room_profiles| room_profiles.get(user_id))
643 .cloned())
644 }
645
646 async fn get_profiles<'a>(
647 &self,
648 room_id: &RoomId,
649 user_ids: &'a [OwnedUserId],
650 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
651 if user_ids.is_empty() {
652 return Ok(BTreeMap::new());
653 }
654
655 let profiles = &self.inner.read().unwrap().profiles;
656 let Some(room_profiles) = profiles.get(room_id) else {
657 return Ok(BTreeMap::new());
658 };
659
660 Ok(user_ids
661 .iter()
662 .filter_map(|user_id| room_profiles.get(user_id).map(|p| (&**user_id, p.clone())))
663 .collect())
664 }
665
666 #[instrument(skip(self, memberships))]
667 async fn get_user_ids(
668 &self,
669 room_id: &RoomId,
670 memberships: RoomMemberships,
671 ) -> Result<Vec<OwnedUserId>> {
672 fn get_user_ids_inner(
678 members: &HashMap<OwnedRoomId, HashMap<OwnedUserId, MembershipState>>,
679 room_id: &RoomId,
680 memberships: RoomMemberships,
681 ) -> Vec<OwnedUserId> {
682 members
683 .get(room_id)
684 .map(|members| {
685 members
686 .iter()
687 .filter_map(|(user_id, membership)| {
688 memberships.matches(membership).then_some(user_id)
689 })
690 .cloned()
691 .collect()
692 })
693 .unwrap_or_default()
694 }
695 let inner = self.inner.read().unwrap();
696 let v = get_user_ids_inner(&inner.stripped_members, room_id, memberships);
697 if !v.is_empty() {
698 return Ok(v);
699 }
700 Ok(get_user_ids_inner(&inner.members, room_id, memberships))
701 }
702
703 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
704 let memory_store_inner = self.inner.read().unwrap();
705 let room_infos = &memory_store_inner.room_info;
706
707 Ok(match room_load_settings {
708 RoomLoadSettings::All => room_infos.values().cloned().collect(),
709
710 RoomLoadSettings::One(room_id) => match room_infos.get(room_id) {
711 Some(room_info) => vec![room_info.clone()],
712 None => vec![],
713 },
714 })
715 }
716
717 async fn get_users_with_display_name(
718 &self,
719 room_id: &RoomId,
720 display_name: &DisplayName,
721 ) -> Result<BTreeSet<OwnedUserId>> {
722 Ok(self
723 .inner
724 .read()
725 .unwrap()
726 .display_names
727 .get(room_id)
728 .and_then(|room_names| room_names.get(display_name).cloned())
729 .unwrap_or_default())
730 }
731
732 async fn get_users_with_display_names<'a>(
733 &self,
734 room_id: &RoomId,
735 display_names: &'a [DisplayName],
736 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
737 if display_names.is_empty() {
738 return Ok(HashMap::new());
739 }
740
741 let inner = self.inner.read().unwrap();
742 let Some(room_names) = inner.display_names.get(room_id) else {
743 return Ok(HashMap::new());
744 };
745
746 Ok(display_names.iter().filter_map(|n| room_names.get(n).map(|d| (n, d.clone()))).collect())
747 }
748
749 async fn get_account_data_event(
750 &self,
751 event_type: GlobalAccountDataEventType,
752 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
753 Ok(self.inner.read().unwrap().account_data.get(&event_type).cloned())
754 }
755
756 async fn get_room_account_data_event(
757 &self,
758 room_id: &RoomId,
759 event_type: RoomAccountDataEventType,
760 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
761 Ok(self
762 .inner
763 .read()
764 .unwrap()
765 .room_account_data
766 .get(room_id)
767 .and_then(|m| m.get(&event_type))
768 .cloned())
769 }
770
771 async fn get_user_room_receipt_event(
772 &self,
773 room_id: &RoomId,
774 receipt_type: ReceiptType,
775 thread: ReceiptThread,
776 user_id: &UserId,
777 ) -> Result<Option<(OwnedEventId, Receipt)>> {
778 Ok(self.get_user_room_receipt_event_impl(room_id, receipt_type, thread, user_id))
779 }
780
781 async fn get_event_room_receipt_events(
782 &self,
783 room_id: &RoomId,
784 receipt_type: ReceiptType,
785 thread: ReceiptThread,
786 event_id: &EventId,
787 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
788 Ok(self
789 .get_event_room_receipt_events_impl(room_id, receipt_type, thread, event_id)
790 .unwrap_or_default())
791 }
792
793 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
794 Ok(self.inner.read().unwrap().custom.get(key).cloned())
795 }
796
797 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
798 Ok(self.inner.write().unwrap().custom.insert(key.to_vec(), value))
799 }
800
801 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
802 Ok(self.inner.write().unwrap().custom.remove(key))
803 }
804
805 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
806 let mut inner = self.inner.write().unwrap();
807
808 inner.profiles.remove(room_id);
809 inner.display_names.remove(room_id);
810 inner.members.remove(room_id);
811 inner.room_info.remove(room_id);
812 inner.room_state.remove(room_id);
813 inner.room_account_data.remove(room_id);
814 inner.stripped_room_state.remove(room_id);
815 inner.stripped_members.remove(room_id);
816 inner.room_user_receipts.remove(room_id);
817 inner.room_event_receipts.remove(room_id);
818 inner.send_queue_events.remove(room_id);
819 inner.dependent_send_queue_events.remove(room_id);
820 inner.thread_subscriptions.remove(room_id);
821
822 Ok(())
823 }
824
825 async fn save_send_queue_request(
826 &self,
827 room_id: &RoomId,
828 transaction_id: OwnedTransactionId,
829 created_at: MilliSecondsSinceUnixEpoch,
830 kind: QueuedRequestKind,
831 priority: usize,
832 ) -> Result<(), Self::Error> {
833 self.inner
834 .write()
835 .unwrap()
836 .send_queue_events
837 .entry(room_id.to_owned())
838 .or_default()
839 .push(QueuedRequest { kind, transaction_id, error: None, priority, created_at });
840 Ok(())
841 }
842
843 async fn update_send_queue_request(
844 &self,
845 room_id: &RoomId,
846 transaction_id: &TransactionId,
847 kind: QueuedRequestKind,
848 ) -> Result<bool, Self::Error> {
849 if let Some(entry) = self
850 .inner
851 .write()
852 .unwrap()
853 .send_queue_events
854 .entry(room_id.to_owned())
855 .or_default()
856 .iter_mut()
857 .find(|item| item.transaction_id == transaction_id)
858 {
859 entry.kind = kind;
860 entry.error = None;
861 Ok(true)
862 } else {
863 Ok(false)
864 }
865 }
866
867 async fn remove_send_queue_request(
868 &self,
869 room_id: &RoomId,
870 transaction_id: &TransactionId,
871 ) -> Result<bool, Self::Error> {
872 let mut inner = self.inner.write().unwrap();
873 let q = &mut inner.send_queue_events;
874
875 let entry = q.get_mut(room_id);
876 if let Some(entry) = entry {
877 if let Some(pos) = entry.iter().position(|item| item.transaction_id == transaction_id) {
879 entry.remove(pos);
880 if entry.is_empty() {
882 q.remove(room_id);
883 }
884 return Ok(true);
885 }
886 }
887
888 Ok(false)
889 }
890
891 async fn load_send_queue_requests(
892 &self,
893 room_id: &RoomId,
894 ) -> Result<Vec<QueuedRequest>, Self::Error> {
895 let mut ret = self
896 .inner
897 .write()
898 .unwrap()
899 .send_queue_events
900 .entry(room_id.to_owned())
901 .or_default()
902 .clone();
903 ret.sort_by_key(|item| Reverse(item.priority));
905 Ok(ret)
906 }
907
908 async fn update_send_queue_request_status(
909 &self,
910 room_id: &RoomId,
911 transaction_id: &TransactionId,
912 error: Option<QueueWedgeError>,
913 ) -> Result<(), Self::Error> {
914 if let Some(entry) = self
915 .inner
916 .write()
917 .unwrap()
918 .send_queue_events
919 .entry(room_id.to_owned())
920 .or_default()
921 .iter_mut()
922 .find(|item| item.transaction_id == transaction_id)
923 {
924 entry.error = error;
925 }
926 Ok(())
927 }
928
929 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
930 Ok(self.inner.read().unwrap().send_queue_events.keys().cloned().collect())
931 }
932
933 async fn save_dependent_queued_request(
934 &self,
935 room: &RoomId,
936 parent_transaction_id: &TransactionId,
937 own_transaction_id: ChildTransactionId,
938 created_at: MilliSecondsSinceUnixEpoch,
939 content: DependentQueuedRequestKind,
940 ) -> Result<(), Self::Error> {
941 self.inner
942 .write()
943 .unwrap()
944 .dependent_send_queue_events
945 .entry(room.to_owned())
946 .or_default()
947 .push(DependentQueuedRequest {
948 kind: content,
949 parent_transaction_id: parent_transaction_id.to_owned(),
950 own_transaction_id,
951 parent_key: None,
952 created_at,
953 });
954 Ok(())
955 }
956
957 async fn mark_dependent_queued_requests_as_ready(
958 &self,
959 room: &RoomId,
960 parent_txn_id: &TransactionId,
961 sent_parent_key: SentRequestKey,
962 ) -> Result<usize, Self::Error> {
963 let mut inner = self.inner.write().unwrap();
964 let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
965 let mut num_updated = 0;
966 for d in dependents.iter_mut().filter(|item| item.parent_transaction_id == parent_txn_id) {
967 d.parent_key = Some(sent_parent_key.clone());
968 num_updated += 1;
969 }
970 Ok(num_updated)
971 }
972
973 async fn update_dependent_queued_request(
974 &self,
975 room: &RoomId,
976 own_transaction_id: &ChildTransactionId,
977 new_content: DependentQueuedRequestKind,
978 ) -> Result<bool, Self::Error> {
979 let mut inner = self.inner.write().unwrap();
980 let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
981 for d in dependents.iter_mut() {
982 if d.own_transaction_id == *own_transaction_id {
983 d.kind = new_content;
984 return Ok(true);
985 }
986 }
987 Ok(false)
988 }
989
990 async fn remove_dependent_queued_request(
991 &self,
992 room: &RoomId,
993 txn_id: &ChildTransactionId,
994 ) -> Result<bool, Self::Error> {
995 let mut inner = self.inner.write().unwrap();
996 let dependents = inner.dependent_send_queue_events.entry(room.to_owned()).or_default();
997 if let Some(pos) = dependents.iter().position(|item| item.own_transaction_id == *txn_id) {
998 dependents.remove(pos);
999 Ok(true)
1000 } else {
1001 Ok(false)
1002 }
1003 }
1004
1005 async fn load_dependent_queued_requests(
1006 &self,
1007 room: &RoomId,
1008 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1009 Ok(self
1010 .inner
1011 .read()
1012 .unwrap()
1013 .dependent_send_queue_events
1014 .get(room)
1015 .cloned()
1016 .unwrap_or_default())
1017 }
1018
1019 async fn upsert_thread_subscriptions(
1020 &self,
1021 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1022 ) -> Result<(), Self::Error> {
1023 let mut inner = self.inner.write().unwrap();
1024
1025 for (room_id, thread_id, mut new) in updates {
1026 let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default();
1027
1028 if let Some(previous) = room_subs.get(thread_id) {
1029 if *previous == new {
1030 continue;
1031 }
1032 if !compare_thread_subscription_bump_stamps(
1033 previous.bump_stamp,
1034 &mut new.bump_stamp,
1035 ) {
1036 continue;
1037 }
1038 }
1039
1040 room_subs.insert(thread_id.to_owned(), new);
1041 }
1042
1043 Ok(())
1044 }
1045
1046 async fn load_thread_subscription(
1047 &self,
1048 room: &RoomId,
1049 thread_id: &EventId,
1050 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1051 let inner = self.inner.read().unwrap();
1052 Ok(inner
1053 .thread_subscriptions
1054 .get(room)
1055 .and_then(|subscriptions| subscriptions.get(thread_id))
1056 .copied())
1057 }
1058
1059 async fn remove_thread_subscription(
1060 &self,
1061 room: &RoomId,
1062 thread_id: &EventId,
1063 ) -> Result<(), Self::Error> {
1064 let mut inner = self.inner.write().unwrap();
1065
1066 let Some(room_subs) = inner.thread_subscriptions.get_mut(room) else {
1067 return Ok(());
1068 };
1069
1070 room_subs.remove(thread_id);
1071
1072 if room_subs.is_empty() {
1073 inner.thread_subscriptions.remove(room);
1075 }
1076
1077 Ok(())
1078 }
1079
1080 async fn optimize(&self) -> Result<(), Self::Error> {
1081 Ok(())
1082 }
1083
1084 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1085 Ok(None)
1086 }
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091 use super::{MemoryStore, Result, StateStore};
1092
1093 async fn get_store() -> Result<impl StateStore> {
1094 Ok(MemoryStore::new())
1095 }
1096
1097 statestore_integration_tests!();
1098}