matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    linked_chunk::{
24        relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator, Position,
25        RawChunk, Update,
26    },
27    ring_buffer::RingBuffer,
28    store_locks::memory_store_helper::try_take_leased_lock,
29};
30use ruma::{
31    time::{Instant, SystemTime},
32    EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
33};
34
35use super::{
36    media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
37    EventCacheStore, EventCacheStoreError, Result,
38};
39use crate::{
40    event_cache::{Event, Gap},
41    media::{MediaRequestParameters, UniqueKey as _},
42};
43
44/// In-memory, non-persistent implementation of the `EventCacheStore`.
45///
46/// Default if no other is configured at startup.
47#[derive(Debug, Clone)]
48pub struct MemoryStore {
49    inner: Arc<StdRwLock<MemoryStoreInner>>,
50    media_service: MediaService,
51}
52
53#[derive(Debug)]
54struct MemoryStoreInner {
55    media: RingBuffer<MediaContent>,
56    leases: HashMap<String, (String, Instant)>,
57    events: RelationalLinkedChunk<Event, Gap>,
58    media_retention_policy: Option<MediaRetentionPolicy>,
59    last_media_cleanup_time: SystemTime,
60}
61
62/// A media content in the `MemoryStore`.
63#[derive(Debug)]
64struct MediaContent {
65    /// The URI of the content.
66    uri: OwnedMxcUri,
67
68    /// The unique key of the content.
69    key: String,
70
71    /// The bytes of the content.
72    data: Vec<u8>,
73
74    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
75    ignore_policy: bool,
76
77    /// The time of the last access of the content.
78    last_access: SystemTime,
79}
80
81const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
82
83impl Default for MemoryStore {
84    fn default() -> Self {
85        // Given that the store is empty, we won't need to clean it up right away.
86        let last_media_cleanup_time = SystemTime::now();
87        let media_service = MediaService::new();
88        media_service.restore(None, Some(last_media_cleanup_time));
89
90        Self {
91            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
92                media: RingBuffer::new(NUMBER_OF_MEDIAS),
93                leases: Default::default(),
94                events: RelationalLinkedChunk::new(),
95                media_retention_policy: None,
96                last_media_cleanup_time,
97            })),
98            media_service,
99        }
100    }
101}
102
103impl MemoryStore {
104    /// Create a new empty MemoryStore
105    pub fn new() -> Self {
106        Self::default()
107    }
108}
109
110#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112impl EventCacheStore for MemoryStore {
113    type Error = EventCacheStoreError;
114
115    async fn try_take_leased_lock(
116        &self,
117        lease_duration_ms: u32,
118        key: &str,
119        holder: &str,
120    ) -> Result<bool, Self::Error> {
121        let mut inner = self.inner.write().unwrap();
122
123        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
124    }
125
126    async fn handle_linked_chunk_updates(
127        &self,
128        room_id: &RoomId,
129        updates: Vec<Update<Event, Gap>>,
130    ) -> Result<(), Self::Error> {
131        let mut inner = self.inner.write().unwrap();
132        inner.events.apply_updates(room_id, updates);
133
134        Ok(())
135    }
136
137    async fn load_all_chunks(
138        &self,
139        room_id: &RoomId,
140    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
141        let inner = self.inner.read().unwrap();
142        inner
143            .events
144            .load_all_chunks(room_id)
145            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
146    }
147
148    async fn load_last_chunk(
149        &self,
150        room_id: &RoomId,
151    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
152        let inner = self.inner.read().unwrap();
153        inner
154            .events
155            .load_last_chunk(room_id)
156            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
157    }
158
159    async fn load_previous_chunk(
160        &self,
161        room_id: &RoomId,
162        before_chunk_identifier: ChunkIdentifier,
163    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
164        let inner = self.inner.read().unwrap();
165        inner
166            .events
167            .load_previous_chunk(room_id, before_chunk_identifier)
168            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
169    }
170
171    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
172        self.inner.write().unwrap().events.clear();
173        Ok(())
174    }
175
176    async fn filter_duplicated_events(
177        &self,
178        room_id: &RoomId,
179        mut events: Vec<OwnedEventId>,
180    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
181        // Collect all duplicated events.
182        let inner = self.inner.read().unwrap();
183
184        let mut duplicated_events = Vec::new();
185
186        for (event, position) in inner.events.unordered_room_items(room_id) {
187            // If `events` is empty, we can short-circuit.
188            if events.is_empty() {
189                break;
190            }
191
192            if let Some(known_event_id) = event.event_id() {
193                // This event is a duplicate!
194                if let Some(index) =
195                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
196                {
197                    duplicated_events.push((events.remove(index), position));
198                }
199            }
200        }
201
202        Ok(duplicated_events)
203    }
204
205    async fn find_event(
206        &self,
207        room_id: &RoomId,
208        event_id: &EventId,
209    ) -> Result<Option<(Position, Event)>, Self::Error> {
210        let inner = self.inner.read().unwrap();
211
212        let event_and_room = inner.events.items().find_map(|(position, event, this_room_id)| {
213            (room_id == this_room_id && event.event_id()? == event_id)
214                .then_some((position, event.clone()))
215        });
216
217        Ok(event_and_room)
218    }
219
220    async fn add_media_content(
221        &self,
222        request: &MediaRequestParameters,
223        data: Vec<u8>,
224        ignore_policy: IgnoreMediaRetentionPolicy,
225    ) -> Result<()> {
226        self.media_service.add_media_content(self, request, data, ignore_policy).await
227    }
228
229    async fn replace_media_key(
230        &self,
231        from: &MediaRequestParameters,
232        to: &MediaRequestParameters,
233    ) -> Result<(), Self::Error> {
234        let expected_key = from.unique_key();
235
236        let mut inner = self.inner.write().unwrap();
237
238        if let Some(media_content) =
239            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
240        {
241            media_content.uri = to.uri().to_owned();
242            media_content.key = to.unique_key();
243        }
244
245        Ok(())
246    }
247
248    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
249        self.media_service.get_media_content(self, request).await
250    }
251
252    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
253        let expected_key = request.unique_key();
254
255        let mut inner = self.inner.write().unwrap();
256
257        let Some(index) =
258            inner.media.iter().position(|media_content| media_content.key == expected_key)
259        else {
260            return Ok(());
261        };
262
263        inner.media.remove(index);
264
265        Ok(())
266    }
267
268    async fn get_media_content_for_uri(
269        &self,
270        uri: &MxcUri,
271    ) -> Result<Option<Vec<u8>>, Self::Error> {
272        self.media_service.get_media_content_for_uri(self, uri).await
273    }
274
275    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
276        let mut inner = self.inner.write().unwrap();
277
278        let positions = inner
279            .media
280            .iter()
281            .enumerate()
282            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
283            .collect::<Vec<_>>();
284
285        // Iterate in reverse-order so that positions stay valid after first removals.
286        for position in positions.into_iter().rev() {
287            inner.media.remove(position);
288        }
289
290        Ok(())
291    }
292
293    async fn set_media_retention_policy(
294        &self,
295        policy: MediaRetentionPolicy,
296    ) -> Result<(), Self::Error> {
297        self.media_service.set_media_retention_policy(self, policy).await
298    }
299
300    fn media_retention_policy(&self) -> MediaRetentionPolicy {
301        self.media_service.media_retention_policy()
302    }
303
304    async fn set_ignore_media_retention_policy(
305        &self,
306        request: &MediaRequestParameters,
307        ignore_policy: IgnoreMediaRetentionPolicy,
308    ) -> Result<(), Self::Error> {
309        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
310    }
311
312    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
313        self.media_service.clean_up_media_cache(self).await
314    }
315}
316
317#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
318#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
319impl EventCacheStoreMedia for MemoryStore {
320    type Error = EventCacheStoreError;
321
322    async fn media_retention_policy_inner(
323        &self,
324    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
325        Ok(self.inner.read().unwrap().media_retention_policy)
326    }
327
328    async fn set_media_retention_policy_inner(
329        &self,
330        policy: MediaRetentionPolicy,
331    ) -> Result<(), Self::Error> {
332        self.inner.write().unwrap().media_retention_policy = Some(policy);
333        Ok(())
334    }
335
336    async fn add_media_content_inner(
337        &self,
338        request: &MediaRequestParameters,
339        data: Vec<u8>,
340        last_access: SystemTime,
341        policy: MediaRetentionPolicy,
342        ignore_policy: IgnoreMediaRetentionPolicy,
343    ) -> Result<(), Self::Error> {
344        // Avoid duplication. Let's try to remove it first.
345        self.remove_media_content(request).await?;
346
347        let ignore_policy = ignore_policy.is_yes();
348
349        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
350            // Do not store it.
351            return Ok(());
352        };
353
354        // Now, let's add it.
355        let mut inner = self.inner.write().unwrap();
356        inner.media.push(MediaContent {
357            uri: request.uri().to_owned(),
358            key: request.unique_key(),
359            data,
360            ignore_policy,
361            last_access,
362        });
363
364        Ok(())
365    }
366
367    async fn set_ignore_media_retention_policy_inner(
368        &self,
369        request: &MediaRequestParameters,
370        ignore_policy: IgnoreMediaRetentionPolicy,
371    ) -> Result<(), Self::Error> {
372        let mut inner = self.inner.write().unwrap();
373        let expected_key = request.unique_key();
374
375        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
376        {
377            media_content.ignore_policy = ignore_policy.is_yes();
378        }
379
380        Ok(())
381    }
382
383    async fn get_media_content_inner(
384        &self,
385        request: &MediaRequestParameters,
386        current_time: SystemTime,
387    ) -> Result<Option<Vec<u8>>, Self::Error> {
388        let mut inner = self.inner.write().unwrap();
389        let expected_key = request.unique_key();
390
391        // First get the content out of the buffer, we are going to put it back at the
392        // end.
393        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
394            return Ok(None);
395        };
396        let Some(mut content) = inner.media.remove(index) else {
397            return Ok(None);
398        };
399
400        // Clone the data.
401        let data = content.data.clone();
402
403        // Update the last access time.
404        content.last_access = current_time;
405
406        // Put it back in the buffer.
407        inner.media.push(content);
408
409        Ok(Some(data))
410    }
411
412    async fn get_media_content_for_uri_inner(
413        &self,
414        expected_uri: &MxcUri,
415        current_time: SystemTime,
416    ) -> Result<Option<Vec<u8>>, Self::Error> {
417        let mut inner = self.inner.write().unwrap();
418
419        // First get the content out of the buffer, we are going to put it back at the
420        // end.
421        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
422            return Ok(None);
423        };
424        let Some(mut content) = inner.media.remove(index) else {
425            return Ok(None);
426        };
427
428        // Clone the data.
429        let data = content.data.clone();
430
431        // Update the last access time.
432        content.last_access = current_time;
433
434        // Put it back in the buffer.
435        inner.media.push(content);
436
437        Ok(Some(data))
438    }
439
440    async fn clean_up_media_cache_inner(
441        &self,
442        policy: MediaRetentionPolicy,
443        current_time: SystemTime,
444    ) -> Result<(), Self::Error> {
445        if !policy.has_limitations() {
446            // We can safely skip all the checks.
447            return Ok(());
448        }
449
450        let mut inner = self.inner.write().unwrap();
451
452        // First, check media content that exceed the max filesize.
453        if policy.computed_max_file_size().is_some() {
454            inner.media.retain(|content| {
455                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
456            });
457        }
458
459        // Then, clean up expired media content.
460        if policy.last_access_expiry.is_some() {
461            inner.media.retain(|content| {
462                content.ignore_policy
463                    || !policy.has_content_expired(current_time, content.last_access)
464            });
465        }
466
467        // Finally, if the cache size is too big, remove old items until it fits.
468        if let Some(max_cache_size) = policy.max_cache_size {
469            // Reverse the iterator because in case the cache size is overflowing, we want
470            // to count the number of old items to remove. Items are sorted by last access
471            // and old items are at the start.
472            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
473                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
474                |(mut cache_size, mut items_to_remove), (index, content)| {
475                    if content.ignore_policy {
476                        // Do not count it.
477                        return (cache_size, items_to_remove);
478                    }
479
480                    let remove_item = if items_to_remove.is_empty() {
481                        // We have not reached the max cache size yet.
482                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
483                            cache_size = sum;
484                            // Start removing items if we have exceeded the max cache size.
485                            cache_size > max_cache_size
486                        } else {
487                            // The cache size is overflowing, remove the remaining items, since the
488                            // max cache size cannot be bigger than
489                            // usize::MAX.
490                            true
491                        }
492                    } else {
493                        // We have reached the max cache size already, just remove it.
494                        true
495                    };
496
497                    if remove_item {
498                        items_to_remove.push(index);
499                    }
500
501                    (cache_size, items_to_remove)
502                },
503            );
504
505            // The indexes are already in reverse order so we can just iterate in that order
506            // to remove them starting by the end.
507            for index in items_to_remove {
508                inner.media.remove(index);
509            }
510        }
511
512        inner.last_media_cleanup_time = current_time;
513
514        Ok(())
515    }
516
517    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
518        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::{MemoryStore, Result};
525    use crate::event_cache_store_media_integration_tests;
526
527    async fn get_event_cache_store() -> Result<MemoryStore> {
528        Ok(MemoryStore::new())
529    }
530
531    event_cache_store_integration_tests!();
532    event_cache_store_integration_tests_time!();
533    event_cache_store_media_integration_tests!(with_media_size_tests);
534}