matrix_sdk_search/
index.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, fs, path::Path, sync::Arc};
16
17use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId, events::AnySyncMessageLikeEvent};
18use tantivy::{
19    Index, IndexReader, TantivyDocument,
20    collector::TopDocs,
21    directory::{MmapDirectory, error::OpenDirectoryError},
22    query::QueryParser,
23    schema::Value,
24};
25use tracing::{error, warn};
26
27use crate::{
28    OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
29    error::IndexError,
30    schema::{MatrixSearchIndexSchema, RoomMessageSchema},
31    writer::SearchIndexWriter,
32};
33
34/// A struct to represent the operations on a [`RoomIndex`]
35pub(crate) enum RoomIndexOperation {
36    Add(TantivyDocument),
37}
38
39/// A struct that holds all data pertaining to a particular room's
40/// message index.
41pub struct RoomIndex {
42    schema: RoomMessageSchema,
43    writer: SearchIndexWriter,
44    reader: IndexReader,
45    query_parser: QueryParser,
46    room_id: OwnedRoomId,
47}
48
49impl fmt::Debug for RoomIndex {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("RoomIndex")
52            .field("schema", &self.schema)
53            .field("room_id", &self.room_id)
54            .finish()
55    }
56}
57
58impl RoomIndex {
59    fn new_with(
60        index: Index,
61        schema: RoomMessageSchema,
62        room_id: &RoomId,
63    ) -> Result<RoomIndex, IndexError> {
64        let writer = index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
65        let reader = index.reader_builder().try_into()?;
66
67        let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
68        Ok(Self {
69            schema,
70            writer: writer.into(),
71            reader,
72            query_parser,
73            room_id: room_id.to_owned(),
74        })
75    }
76
77    /// Create new [`RoomIndex`] which stores the index in path/room_id
78    pub fn new(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
79        let path = path.join(room_id.as_str());
80        let schema = RoomMessageSchema::new();
81        fs::create_dir_all(path.clone())?;
82        let index = Index::create_in_dir(path, schema.as_tantivy_schema())?;
83        RoomIndex::new_with(index, schema, room_id)
84    }
85
86    /// Create new [`RoomIndex`] which stores the index in memory.
87    /// Intended for testing.
88    pub fn new_in_memory(room_id: &RoomId) -> Result<RoomIndex, IndexError> {
89        let schema = RoomMessageSchema::new();
90        let index = Index::create_in_ram(schema.as_tantivy_schema());
91        RoomIndex::new_with(index, schema, room_id)
92    }
93
94    /// Open index at path/room_id if it exists else
95    /// create new [`RoomIndex`] which stores the index in path/room_id
96    pub fn open_or_create(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
97        let path = path.join(room_id.as_str());
98        let mmap_dir = match MmapDirectory::open(path) {
99            Ok(dir) => Ok(dir),
100            Err(err) => match err {
101                OpenDirectoryError::DoesNotExist(path) => {
102                    fs::create_dir_all(path.clone()).map_err(|err| {
103                        OpenDirectoryError::IoError {
104                            io_error: Arc::new(err),
105                            directory_path: path.to_path_buf(),
106                        }
107                    })?;
108                    MmapDirectory::open(path)
109                }
110                _ => Err(err),
111            },
112        }?;
113        let schema = RoomMessageSchema::new();
114        let index = Index::open_or_create(mmap_dir, schema.as_tantivy_schema())?;
115        RoomIndex::new_with(index, schema, room_id)
116    }
117
118    /// Open index at path/room_id. Fails if it doesn't exist.
119    pub fn open(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
120        let path = path.join(room_id.as_str());
121        let index_path = MmapDirectory::open(path)?;
122        let index = Index::open(index_path)?;
123        let schema: RoomMessageSchema = index.schema().try_into()?;
124        RoomIndex::new_with(index, schema, room_id)
125    }
126
127    /// Handle [`AnySyncMessageLikeEvent`]
128    ///
129    /// This which will add/remove/edit an event in the index based on the
130    /// event type.
131    pub fn handle_event(&mut self, event: AnySyncMessageLikeEvent) -> Result<(), IndexError> {
132        let event_id = event.event_id().to_owned();
133
134        match self.schema.handle_event(event)? {
135            RoomIndexOperation::Add(document) => {
136                if !self.contains(&event_id) {
137                    self.writer.add_document(document)?;
138                }
139            }
140        }
141        Ok(())
142    }
143
144    /// Commit added events to [`RoomIndex`]
145    pub fn commit(&mut self) -> Result<OpStamp, IndexError> {
146        let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it.
147        Ok(last_commit_opstamp)
148    }
149
150    /// Commit added events to [`RoomIndex`] and
151    /// update searchers so that they reflect the state of the last
152    /// `.commit()`.
153    ///
154    /// Every commit should be rapidly reflected on your `IndexReader` and you
155    /// should not need to call `reload()` at all.
156    ///
157    /// This automatic reload can take 10s of milliseconds to kick in however,
158    /// and in unit tests it can be nice to deterministically force the
159    /// reload of searchers.
160    pub fn commit_and_reload(&mut self) -> Result<OpStamp, IndexError> {
161        let last_commit_opstamp = self.writer.commit()?; // TODO: This is blocking. Handle it.
162        self.reader.reload()?;
163        Ok(last_commit_opstamp)
164    }
165
166    /// Search the [`RoomIndex`] for some query. Returns a list of
167    /// results with a maximum given length.
168    pub fn search(
169        &self,
170        query: &str,
171        max_number_of_results: usize,
172    ) -> Result<Vec<OwnedEventId>, IndexError> {
173        let query = self.query_parser.parse_query(query)?;
174        let searcher = self.reader.searcher();
175
176        let results = searcher.search(&query, &TopDocs::with_limit(max_number_of_results))?;
177        let mut ret: Vec<OwnedEventId> = Vec::new();
178        let pk = self.schema.primary_key();
179
180        for (_score, doc_address) in results {
181            let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
182            match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
183                Some(value) => match OwnedEventId::try_from(value) {
184                    Ok(event_id) => ret.push(event_id),
185                    Err(err) => error!("error while parsing event_id from search result: {err:?}"),
186                },
187                _ => error!("unexpected value type while searching documents"),
188            }
189        }
190
191        Ok(ret)
192    }
193
194    fn contains(&self, event_id: &EventId) -> bool {
195        let search_result = self.search(format!("event_id:\"{event_id}\"").as_str(), 1);
196        match search_result {
197            Ok(results) => !results.is_empty(),
198            Err(err) => {
199                warn!("Failed to check if event has been indexed, assuming it has: {err}");
200                true
201            }
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::{collections::HashSet, error::Error};
209
210    use matrix_sdk_test::event_factory::EventFactory;
211    use ruma::{event_id, room_id, user_id};
212
213    use crate::index::RoomIndex;
214
215    #[test]
216    fn test_make_index_in_memory() {
217        let room_id = room_id!("!room_id:localhost");
218        let index = RoomIndex::new_in_memory(room_id);
219
220        index.expect("failed to make index in ram: {index:?}");
221    }
222
223    #[test]
224    fn test_handle_event() {
225        let room_id = room_id!("!room_id:localhost");
226        let mut index =
227            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
228
229        let event = EventFactory::new()
230            .text_msg("event message")
231            .event_id(event_id!("$event_id:localhost"))
232            .room(room_id)
233            .sender(user_id!("@user_id:localhost"))
234            .into_any_sync_message_like_event();
235
236        index.handle_event(event).expect("failed to add event: {res:?}");
237    }
238
239    #[test]
240    fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
241        let room_id = room_id!("!room_id:localhost");
242        let mut index =
243            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
244
245        let event_id_1 = event_id!("$event_id_1:localhost");
246        let event_id_2 = event_id!("$event_id_2:localhost");
247        let event_id_3 = event_id!("$event_id_3:localhost");
248
249        index.handle_event(
250            EventFactory::new()
251                .text_msg("This is a sentence")
252                .event_id(event_id_1)
253                .room(room_id)
254                .sender(user_id!("@user_id:localhost"))
255                .into_any_sync_message_like_event(),
256        )?;
257
258        index.handle_event(
259            EventFactory::new()
260                .text_msg("All new words")
261                .event_id(event_id_2)
262                .room(room_id)
263                .sender(user_id!("@user_id:localhost"))
264                .into_any_sync_message_like_event(),
265        )?;
266
267        index.handle_event(
268            EventFactory::new()
269                .text_msg("A similar sentence")
270                .event_id(event_id_3)
271                .room(room_id)
272                .sender(user_id!("@user_id:localhost"))
273                .into_any_sync_message_like_event(),
274        )?;
275
276        index.commit_and_reload()?;
277
278        let result = index.search("sentence", 10).expect("search failed with: {result:?}");
279        let result: HashSet<_> = result.iter().collect();
280
281        let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
282        let true_value: HashSet<_> = true_value.iter().collect();
283
284        assert_eq!(result, true_value, "search result not correct: {result:?}");
285
286        Ok(())
287    }
288
289    #[test]
290    fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
291        let room_id = room_id!("!room_id:localhost");
292        let mut index =
293            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
294
295        index.commit_and_reload()?;
296
297        let result = index.search("sentence", 10).expect("search failed with: {result:?}");
298
299        assert!(result.is_empty(), "search result not empty: {result:?}");
300
301        Ok(())
302    }
303
304    #[test]
305    fn test_index_contains_false() -> Result<(), Box<dyn Error>> {
306        let room_id = room_id!("!room_id:localhost");
307        let mut index =
308            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
309
310        let event_id = event_id!("$event_id:localhost");
311
312        index.commit_and_reload()?;
313
314        assert!(!index.contains(event_id), "Index should not contain event");
315
316        Ok(())
317    }
318
319    #[test]
320    fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
321        let room_id = room_id!("!room_id:localhost");
322        let mut index =
323            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
324
325        let event_id = event_id!("$event_id:localhost");
326        let event = EventFactory::new()
327            .text_msg("This is a sentence")
328            .event_id(event_id)
329            .room(room_id)
330            .sender(user_id!("@user_id:localhost"))
331            .into_any_sync_message_like_event();
332
333        index.handle_event(event)?;
334
335        index.commit_and_reload()?;
336
337        assert!(index.contains(event_id), "Index should contain event");
338
339        Ok(())
340    }
341
342    #[test]
343    fn test_indexing_idempotency() -> Result<(), Box<dyn Error>> {
344        let room_id = room_id!("!room_id:localhost");
345        let mut index =
346            RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
347
348        let event_id = event_id!("$event_id:localhost");
349        let event = EventFactory::new()
350            .text_msg("This is a sentence")
351            .event_id(event_id)
352            .room(room_id)
353            .sender(user_id!("@user_id:localhost"))
354            .into_any_sync_message_like_event();
355
356        index.handle_event(event.clone())?;
357
358        index.commit_and_reload()?;
359
360        assert!(index.contains(event_id), "Index should contain event");
361
362        // indexing again should do nothing
363        index.handle_event(event)?;
364
365        index.commit_and_reload()?;
366
367        assert!(index.contains(event_id), "Index should still contain event");
368
369        let result = index.search("sentence", 10).expect("search failed with: {result:?}");
370
371        assert_eq!(result.len(), 1, "Index should have ignored second indexing");
372
373        Ok(())
374    }
375}