matrix_sdk/search_index/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The search index is an abstraction layer in the matrix-sdk for the
16//! matrix-sdk-search crate. It provides a [`SearchIndex`] which wraps
17//! multiple [`RoomIndex`].
18
19use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc};
20
21use futures_util::future::join_all;
22use matrix_sdk_base::deserialized_responses::TimelineEvent;
23use matrix_sdk_search::{
24    error::IndexError,
25    index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
26};
27use ruma::{
28    EventId, OwnedEventId, OwnedRoomId, RoomId,
29    events::{
30        AnySyncMessageLikeEvent, AnySyncTimelineEvent,
31        room::{
32            message::{OriginalSyncRoomMessageEvent, Relation, SyncRoomMessageEvent},
33            redaction::SyncRoomRedactionEvent,
34        },
35    },
36    room_version_rules::RedactionRules,
37};
38use tokio::sync::{Mutex, MutexGuard};
39use tracing::{debug, warn};
40
41use crate::event_cache::RoomEventCache;
42
43type Password = String;
44
45/// Type of location to store [`RoomIndex`]
46#[derive(Clone, Debug)]
47pub enum SearchIndexStoreKind {
48    /// Store unencrypted in file system folder
49    UnencryptedDirectory(PathBuf),
50    /// Store encrypted in file system folder
51    EncryptedDirectory(PathBuf, Password),
52    /// Store in memory
53    InMemory,
54}
55
56/// Object that handles inteeraction with [`RoomIndex`]'s for search
57#[derive(Clone, Debug)]
58pub struct SearchIndex {
59    /// HashMap that links each joined room to its RoomIndex
60    room_indexes: Arc<Mutex<HashMap<OwnedRoomId, RoomIndex>>>,
61
62    /// Base directory that stores the directories for each RoomIndex
63    search_index_store_kind: SearchIndexStoreKind,
64}
65
66impl SearchIndex {
67    /// Create a new [`SearchIndex`]
68    pub fn new(
69        room_indexes: Arc<Mutex<HashMap<OwnedRoomId, RoomIndex>>>,
70        search_index_store_kind: SearchIndexStoreKind,
71    ) -> Self {
72        Self { room_indexes, search_index_store_kind }
73    }
74
75    /// Acquire [`SearchIndexGuard`] for this [`SearchIndex`].
76    pub async fn lock(&self) -> SearchIndexGuard<'_> {
77        SearchIndexGuard {
78            index_map: self.room_indexes.lock().await,
79            search_index_store_kind: &self.search_index_store_kind,
80        }
81    }
82}
83
84/// Object that represents an acquired [`SearchIndex`].
85#[derive(Debug)]
86pub struct SearchIndexGuard<'a> {
87    /// Guard around the [`RoomIndex`] map
88    index_map: MutexGuard<'a, HashMap<OwnedRoomId, RoomIndex>>,
89
90    /// Base directory that stores the directories for each RoomIndex
91    search_index_store_kind: &'a SearchIndexStoreKind,
92}
93
94impl SearchIndexGuard<'_> {
95    fn create_index(&self, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
96        let index = match self.search_index_store_kind {
97            SearchIndexStoreKind::UnencryptedDirectory(path) => {
98                RoomIndexBuilder::new_on_disk(path.to_path_buf(), room_id).unencrypted().build()?
99            }
100            SearchIndexStoreKind::EncryptedDirectory(path, password) => {
101                RoomIndexBuilder::new_on_disk(path.to_path_buf(), room_id)
102                    .encrypted(password)
103                    .build()?
104            }
105            SearchIndexStoreKind::InMemory => RoomIndexBuilder::new_in_memory(room_id).build(),
106        };
107        Ok(index)
108    }
109
110    /// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given
111    /// [`RoomId`]
112    ///
113    /// This which will add/remove/edit an event in the index based on the
114    /// event type.
115    ///
116    /// Prefer [`SearchIndexGuard::bulk_execute`] for multiple operations.
117    pub(crate) fn execute(
118        &mut self,
119        operation: RoomIndexOperation,
120        room_id: &RoomId,
121    ) -> Result<(), IndexError> {
122        if !self.index_map.contains_key(room_id) {
123            let index = self.create_index(room_id)?;
124            self.index_map.insert(room_id.to_owned(), index);
125        }
126
127        let index = self.index_map.get_mut(room_id).expect("index should exist");
128
129        index.execute(operation)
130    }
131
132    /// Handle a [`RoomIndexOperation`] in the [`RoomIndex`] of a given
133    /// [`RoomId`]
134    ///
135    /// This which will add/remove/edit an event in the index based on the
136    /// event type.
137    pub(crate) fn bulk_execute(
138        &mut self,
139        operations: Vec<RoomIndexOperation>,
140        room_id: &RoomId,
141    ) -> Result<(), IndexError> {
142        if !self.index_map.contains_key(room_id) {
143            let index = self.create_index(room_id)?;
144            self.index_map.insert(room_id.to_owned(), index);
145        }
146
147        let index = self.index_map.get_mut(room_id).expect("index should exist");
148
149        index.bulk_execute(operations)
150    }
151
152    /// Search a [`Room`]'s index for the query and return at most
153    /// max_number_of_results results.
154    pub(crate) fn search(
155        &mut self,
156        query: &str,
157        max_number_of_results: usize,
158        pagination_offset: Option<usize>,
159        room_id: &RoomId,
160    ) -> Result<Vec<OwnedEventId>, IndexError> {
161        if !self.index_map.contains_key(room_id) {
162            let index = self.create_index(room_id)?;
163            self.index_map.insert(room_id.to_owned(), index);
164        }
165
166        let index = self.index_map.get_mut(room_id).expect("index should exist");
167
168        index.search(query, max_number_of_results, pagination_offset)
169    }
170
171    /// Given a [`TimelineEvent`] this function will derive a
172    /// [`RoomIndexOperation`], if it should be handled, and execute it;
173    /// returning the result.
174    ///
175    /// Prefer [`SearchIndexGuard::bulk_handle_timeline_event`] for multiple
176    /// events.
177    pub async fn handle_timeline_event(
178        &mut self,
179        event: TimelineEvent,
180        room_cache: &RoomEventCache,
181        room_id: &RoomId,
182        redaction_rules: &RedactionRules,
183    ) -> Result<(), IndexError> {
184        if let Some(index_operation) =
185            parse_timeline_event(room_cache, event, redaction_rules).await
186        {
187            self.execute(index_operation, room_id)
188        } else {
189            Ok(())
190        }
191    }
192
193    /// Run [`SearchIndexGuard::handle_timeline_event`] for multiple
194    /// [`TimelineEvent`].
195    pub async fn bulk_handle_timeline_event<T>(
196        &mut self,
197        events: T,
198        room_cache: &RoomEventCache,
199        room_id: &RoomId,
200        redaction_rules: &RedactionRules,
201    ) -> Result<(), IndexError>
202    where
203        T: Iterator<Item = TimelineEvent>,
204    {
205        let futures = events.map(|ev| parse_timeline_event(room_cache, ev, redaction_rules));
206
207        let operations: Vec<_> = join_all(futures).await.into_iter().flatten().collect();
208
209        self.bulk_execute(operations, room_id)
210    }
211}
212
213/// Given an event id this function returns the most recent edit on said event
214/// or the event itself if there are no edits.
215async fn get_most_recent_edit(
216    cache: &RoomEventCache,
217    original: &EventId,
218) -> Option<OriginalSyncRoomMessageEvent> {
219    use ruma::events::{AnySyncTimelineEvent, relation::RelationType};
220
221    let Some((original_ev, related)) =
222        cache.find_event_with_relations(original, Some(vec![RelationType::Replacement])).await
223    else {
224        debug!("Couldn't find relations for {}", original);
225        return None;
226    };
227
228    match related.last().unwrap_or(&original_ev).raw().deserialize() {
229        Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(latest))) => {
230            latest.as_original().cloned()
231        }
232        _ => None,
233    }
234}
235
236/// If the given [`OriginalSyncRoomMessageEvent`] is an edit we make an
237/// [`RoomIndexOperation::Edit`] with the new most recent version of the
238/// original.
239async fn handle_possible_edit(
240    event: &OriginalSyncRoomMessageEvent,
241    cache: &RoomEventCache,
242) -> Option<RoomIndexOperation> {
243    if let Some(Relation::Replacement(replacement_data)) = &event.content.relates_to {
244        if let Some(recent) = get_most_recent_edit(cache, &replacement_data.event_id).await {
245            return Some(RoomIndexOperation::Edit(replacement_data.event_id.clone(), recent));
246        } else {
247            return Some(RoomIndexOperation::Noop);
248        }
249    }
250    None
251}
252
253/// Return a [`RoomIndexOperation::Edit`] or [`RoomIndexOperation::Add`]
254/// depending on the message.
255async fn handle_room_message(
256    event: SyncRoomMessageEvent,
257    cache: &RoomEventCache,
258) -> Option<RoomIndexOperation> {
259    if let Some(event) = event.as_original() {
260        return handle_possible_edit(event, cache).await.or(get_most_recent_edit(
261            cache,
262            &event.event_id,
263        )
264        .await
265        .map(RoomIndexOperation::Add));
266    }
267    None
268}
269
270/// Return a [`RoomIndexOperation::Edit`] or [`RoomIndexOperation::Remove`]
271/// depending on the message.
272async fn handle_room_redaction(
273    event: SyncRoomRedactionEvent,
274    cache: &RoomEventCache,
275    rules: &RedactionRules,
276) -> Option<RoomIndexOperation> {
277    if let Some(redacted_event_id) = event.redacts(rules)
278        && let Some(redacted_event) = cache.find_event(redacted_event_id).await
279        && let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
280            redacted_event,
281        ))) = redacted_event.raw().deserialize()
282        && let Some(redacted_event) = redacted_event.as_original()
283    {
284        return handle_possible_edit(redacted_event, cache)
285            .await
286            .or(Some(RoomIndexOperation::Remove(redacted_event.event_id.clone())));
287    }
288    None
289}
290
291/// Prepare a [`TimelineEvent`] into a [`RoomIndexOperation`] for search
292/// indexing.
293async fn parse_timeline_event(
294    cache: &RoomEventCache,
295    event: TimelineEvent,
296    redaction_rules: &RedactionRules,
297) -> Option<RoomIndexOperation> {
298    use ruma::events::AnySyncTimelineEvent;
299
300    if event.kind.is_utd() {
301        return None;
302    }
303
304    match event.raw().deserialize() {
305        Ok(event) => match event {
306            AnySyncTimelineEvent::MessageLike(event) => match event {
307                AnySyncMessageLikeEvent::RoomMessage(event) => {
308                    handle_room_message(event, cache).await
309                }
310                AnySyncMessageLikeEvent::RoomRedaction(event) => {
311                    handle_room_redaction(event, cache, redaction_rules).await
312                }
313                _ => None,
314            },
315            AnySyncTimelineEvent::State(_) => None,
316        },
317
318        Err(e) => {
319            warn!("failed to parse event: {e:?}");
320            None
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
328    use ruma::{
329        event_id, events::room::message::RoomMessageEventContentWithoutRelation, room_id, user_id,
330    };
331
332    use crate::test_utils::mocks::MatrixMockServer;
333
334    #[cfg(feature = "experimental-search")]
335    #[async_test]
336    async fn test_sync_message_is_indexed() {
337        let mock_server = MatrixMockServer::new().await;
338        let client = mock_server.client_builder().build().await;
339
340        client.event_cache().subscribe().unwrap();
341
342        let room_id = room_id!("!room_id:localhost");
343        let event_id = event_id!("$event_id:localost");
344        let user_id = user_id!("@user_id:localost");
345
346        let event_factory = EventFactory::new();
347        let room = mock_server
348            .sync_room(
349                &client,
350                JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![
351                    event_factory
352                        .text_msg("this is a sentence")
353                        .event_id(event_id)
354                        .sender(user_id)
355                        .into_raw_sync(),
356                ]),
357            )
358            .await;
359
360        let response = room.search("this", 5, None).await.expect("search should have 1 result");
361
362        assert_eq!(response.len(), 1, "unexpected numbers of responses: {response:?}");
363        assert_eq!(response[0], event_id, "event id doesn't match: {response:?}");
364    }
365
366    #[cfg(feature = "experimental-search")]
367    #[async_test]
368    async fn test_search_index_edit_ordering() {
369        let room_id = room_id!("!room_id:localhost");
370        let dummy_id = event_id!("$dummy");
371        let edit1_id = event_id!("$edit1");
372        let edit2_id = event_id!("$edit2");
373        let edit3_id = event_id!("$edit3");
374        let original_id = event_id!("$original");
375
376        let server = MatrixMockServer::new().await;
377        let client = server.client_builder().build().await;
378
379        let event_cache = client.event_cache();
380        event_cache.subscribe().unwrap();
381
382        let room = server.sync_joined_room(&client, room_id).await;
383
384        let f = EventFactory::new().room(room_id).sender(user_id!("@user_id:localhost"));
385
386        // Indexable dummy message required because RoomIndex is initialised lazily.
387        let dummy = f.text_msg("dummy").event_id(dummy_id).into_raw();
388
389        let original = f.text_msg("This is a message").event_id(original_id).into_raw();
390
391        let edit1 = f
392            .text_msg("* A new message")
393            .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("A new message"))
394            .event_id(edit1_id)
395            .into_raw();
396
397        let edit2 = f
398            .text_msg("* An even newer message")
399            .edit(
400                original_id,
401                RoomMessageEventContentWithoutRelation::text_plain("An even newer message"),
402            )
403            .event_id(edit2_id)
404            .into_raw();
405
406        let edit3 = f
407            .text_msg("* The newest message")
408            .edit(
409                original_id,
410                RoomMessageEventContentWithoutRelation::text_plain("The newest message"),
411            )
412            .event_id(edit3_id)
413            .into_raw();
414
415        server
416            .sync_room(
417                &client,
418                JoinedRoomBuilder::new(room_id)
419                    .add_timeline_event(dummy.clone())
420                    .add_timeline_event(edit1.clone())
421                    .add_timeline_event(edit2.clone()),
422            )
423            .await;
424
425        let results = room.search("message", 3, None).await.unwrap();
426
427        assert_eq!(results.len(), 0, "Search should return 0 results, got {results:?}");
428
429        // Adding the original after some pending edits should add the latest edit
430        // instead of the original.
431        server
432            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(original))
433            .await;
434
435        let results = room.search("message", 3, None).await.unwrap();
436
437        assert_eq!(results.len(), 1, "Search should return 1 result, got {results:?}");
438        assert_eq!(results[0], edit2_id, "Search should return latest edit, got {:?}", results[0]);
439
440        // Editing the original after it exists and there has been another edit should
441        // delete the previous edits and add this one
442        server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(edit3)).await;
443
444        let results = room.search("message", 3, None).await.unwrap();
445
446        assert_eq!(results.len(), 1, "Search should return 1 result, got {results:?}");
447        assert_eq!(results[0], edit3_id, "Search should return latest edit, got {:?}", results[0]);
448    }
449}