matrix_sdk_search/index/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// A module for building a [`RoomIndex`]
16pub mod builder;
17
18use std::{collections::HashSet, fmt};
19
20use ruma::{
21    EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
22};
23use tantivy::{
24    Index, IndexReader, TantivyDocument, collector::TopDocs, directory::error::OpenDirectoryError,
25    query::QueryParser, schema::Value,
26};
27use tracing::{debug, error, warn};
28
29use crate::{
30    OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
31    error::IndexError,
32    schema::{MatrixSearchIndexSchema, RoomMessageSchema},
33    writer::SearchIndexWriter,
34};
35
36/// A struct to represent the operations on a [`RoomIndex`]
37#[derive(Debug, Clone)]
38pub enum RoomIndexOperation {
39    /// Add this event to the index.
40    Add(OriginalSyncRoomMessageEvent),
41    /// Remove all documents in the index where
42    /// `MatrixSearchIndexSchema::deletion_key()` matches this event id.
43    Remove(OwnedEventId),
44    /// Replace all documents in the index where
45    /// `MatrixSearchIndexSchema::deletion_key()` matches this event id with
46    /// the new event.
47    Edit(OwnedEventId, OriginalSyncRoomMessageEvent),
48    /// Do nothing.
49    Noop,
50}
51
52/// A struct that holds all data pertaining to a particular room's
53/// message index.
54pub struct RoomIndex {
55    index: Index,
56    schema: RoomMessageSchema,
57    query_parser: QueryParser,
58    room_id: OwnedRoomId,
59    uncommitted_adds: HashSet<OwnedEventId>,
60    uncommitted_removes: HashSet<OwnedEventId>,
61}
62
63impl fmt::Debug for RoomIndex {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("RoomIndex")
66            .field("schema", &self.schema)
67            .field("room_id", &self.room_id)
68            .finish()
69    }
70}
71
72impl RoomIndex {
73    pub(crate) fn new_with(index: Index, schema: RoomMessageSchema, room_id: &RoomId) -> RoomIndex {
74        let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
75        Self {
76            index,
77            schema,
78            query_parser,
79            room_id: room_id.to_owned(),
80            uncommitted_adds: HashSet::new(),
81            uncommitted_removes: HashSet::new(),
82        }
83    }
84
85    /// Get a [`SearchIndexWriter`] for this index.
86    fn get_writer(&self) -> Result<SearchIndexWriter, IndexError> {
87        let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
88        Ok(SearchIndexWriter::new(writer, self.schema.clone()))
89    }
90
91    /// Get a [`IndexReader`] for this index.
92    fn get_reader(&self) -> Result<IndexReader, IndexError> {
93        Ok(self.index.reader_builder().try_into()?)
94    }
95
96    /// Commit added events to [`RoomIndex`]. The changes are not reflected in
97    /// the search results until the serchers are reloaded.
98    ///
99    /// Use [`RoomIndex::commit_and_reload`] for this purpose.
100    fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
101        let last_commit_opstamp = writer.commit()?; // TODO: This is blocking. Handle it.
102        self.uncommitted_adds.clear();
103        self.uncommitted_removes.clear();
104        Ok(last_commit_opstamp)
105    }
106
107    /// Commit added events to [`RoomIndex`] and
108    /// update searchers so that they reflect the state of the last
109    /// `.commit()`.
110    ///
111    /// Every commit should be rapidly reflected on your `IndexReader` and you
112    /// should not need to call `reload()` at all.
113    ///
114    /// This automatic reload can take 10s of milliseconds to kick in however,
115    /// and in unit tests it can be nice to deterministically force the
116    /// reload of searchers.
117    fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
118        debug!(
119            "RoomIndex: committing and reloading: uncommitted: {:?}, {:?}",
120            self.uncommitted_adds, self.uncommitted_removes
121        );
122        let last_commit_opstamp = self.commit(writer)?;
123        self.get_reader()?.reload()?;
124        Ok(last_commit_opstamp)
125    }
126
127    /// Search the [`RoomIndex`] for some query. Returns a list of
128    /// results with a maximum given length. If `pagination_offset` is
129    /// set then the results will start there, i.e.
130    ///
131    /// if `max_number_of_results = 3` and `pagination_offset = 10`
132    /// (and there are a surplus of results)
133    /// then this will return results `11, 12, 13`
134    pub fn search(
135        &self,
136        query: &str,
137        max_number_of_results: usize,
138        pagination_offset: Option<usize>,
139    ) -> Result<Vec<OwnedEventId>, IndexError> {
140        let query = self.query_parser.parse_query(query)?;
141        let searcher = self.get_reader()?.searcher();
142
143        let offset = pagination_offset.unwrap_or(0);
144
145        let results = searcher
146            .search(&query, &TopDocs::with_limit(max_number_of_results).and_offset(offset))?;
147        let mut ret: Vec<OwnedEventId> = Vec::new();
148        let pk = self.schema.primary_key();
149
150        for (_score, doc_address) in results {
151            let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
152            match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
153                Some(value) => match OwnedEventId::try_from(value) {
154                    Ok(event_id) => ret.push(event_id),
155                    Err(err) => error!("error while parsing event_id from search result: {err:?}"),
156                },
157                _ => error!("unexpected value type while searching documents"),
158            }
159        }
160
161        Ok(ret)
162    }
163
164    fn get_events_to_be_removed(
165        &self,
166        event_id: &EventId,
167    ) -> Result<Vec<OwnedEventId>, IndexError> {
168        self.search(
169            format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.deletion_key()))
170                .as_str(),
171            10000,
172            None,
173        )
174    }
175
176    fn add(
177        &mut self,
178        writer: &mut SearchIndexWriter,
179        event: OriginalSyncRoomMessageEvent,
180    ) -> Result<(), IndexError> {
181        if !self.contains(&event.event_id) {
182            writer.add(self.schema.make_doc(event.clone())?)?;
183        }
184        self.uncommitted_removes.remove(&event.event_id);
185        self.uncommitted_adds.insert(event.event_id);
186        Ok(())
187    }
188
189    fn remove(
190        &mut self,
191        writer: &mut SearchIndexWriter,
192        event_id: OwnedEventId,
193    ) -> Result<(), IndexError> {
194        let events = self.get_events_to_be_removed(&event_id)?;
195
196        writer.remove(&event_id);
197
198        for event in events.into_iter() {
199            self.uncommitted_adds.remove(&event);
200            self.uncommitted_removes.insert(event);
201        }
202
203        Ok(())
204    }
205
206    fn execute_impl(
207        &mut self,
208        writer: &mut SearchIndexWriter,
209        operation: &RoomIndexOperation,
210    ) -> Result<(), IndexError> {
211        debug!("INDEX: executing {operation:?}");
212        match operation.clone() {
213            RoomIndexOperation::Add(event) => {
214                self.add(writer, event)?;
215            }
216            RoomIndexOperation::Remove(event_id) => {
217                self.remove(writer, event_id)?;
218            }
219            RoomIndexOperation::Edit(remove_event_id, event) => {
220                self.remove(writer, remove_event_id)?;
221                self.add(writer, event)?;
222            }
223            RoomIndexOperation::Noop => {}
224        }
225        Ok(())
226    }
227
228    /// Execute [`RoomIndexOperation`] with retry
229    fn execute_with_retry(
230        &mut self,
231        writer: &mut SearchIndexWriter,
232        operation: &RoomIndexOperation,
233        retries: usize,
234    ) -> Result<(), IndexError> {
235        let mut num_tries = 0;
236
237        while let Err(err) = self.execute_impl(writer, operation) {
238            if num_tries == retries {
239                return Err(err);
240            }
241            match err {
242                // Retry
243                IndexError::TantivyError(_)
244                | IndexError::IndexSchemaError(_)
245                | IndexError::IndexWriteError(_)
246                | IndexError::IO(_) => {
247                    num_tries += 1;
248                }
249                IndexError::OpenDirectoryError(ref e) => match e {
250                    // Retry
251                    OpenDirectoryError::IoError { io_error: _, directory_path: _ } => {
252                        num_tries += 1;
253                    }
254                    // Bubble
255                    OpenDirectoryError::DoesNotExist(_)
256                    | OpenDirectoryError::FailedToCreateTempDir(_)
257                    | OpenDirectoryError::NotADirectory(_) => return Err(err),
258                },
259                // Bubble
260                IndexError::QueryParserError(_) => return Err(err),
261                // Ignore
262                IndexError::CannotIndexRedactedMessage
263                | IndexError::EmptyMessage
264                | IndexError::MessageTypeNotSupported => break,
265            }
266            debug!("Failed to execute operation in room index (try {num_tries}): {err}");
267        }
268        Ok(())
269    }
270
271    /// Execute [`RoomIndexOperation`]
272    ///
273    /// If an error occurs, retry 5 times if possible.
274    ///
275    /// This which will add/remove/edit an event in the index based on the
276    /// operation.
277    ///
278    /// Prefer [`RoomIndex::bulk_execute`] for multiple operations.
279    pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
280        let mut writer = self.get_writer()?;
281        self.execute_with_retry(&mut writer, &operation, 5)?;
282        self.commit_and_reload(&mut writer)?;
283        Ok(())
284    }
285
286    /// Bulk execute [`RoomIndexOperation`]s
287    ///
288    /// If an error occurs in the batch it retries 5 times if possible.
289    ///
290    /// This which will add/remove/edit an events in the index based on the
291    /// operations.
292    pub fn bulk_execute(&mut self, operations: Vec<RoomIndexOperation>) -> Result<(), IndexError> {
293        let mut writer = self.get_writer()?;
294        let mut operations = operations.into_iter();
295        let mut next_operation = operations.next();
296
297        while let Some(ref operation) = next_operation {
298            self.execute_with_retry(&mut writer, operation, 5)?;
299            next_operation = operations.next();
300        }
301
302        self.commit_and_reload(&mut writer)?;
303
304        Ok(())
305    }
306
307    fn contains(&self, event_id: &EventId) -> bool {
308        let search_result = self.search(
309            format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key()))
310                .as_str(),
311            1,
312            None,
313        );
314        match search_result {
315            Ok(results) => {
316                !self.uncommitted_removes.contains(event_id)
317                    && (!results.is_empty() || self.uncommitted_adds.contains(event_id))
318            }
319            Err(err) => {
320                warn!("Failed to check if event has been indexed, assuming it has: {err}");
321                true
322            }
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use std::{collections::HashSet, error::Error};
330
331    use matrix_sdk_test::event_factory::EventFactory;
332    use ruma::{
333        EventId, event_id,
334        events::{
335            AnySyncMessageLikeEvent,
336            room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation},
337        },
338        room_id, user_id,
339    };
340
341    use crate::{
342        error::IndexError,
343        index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
344    };
345
346    /// Helper function to add a regular message to the index
347    ///
348    /// # Panic
349    /// Panics when event is not a [`OriginalSyncRoomMessageEvent`] with no
350    /// relations.
351    fn index_message(
352        index: &mut RoomIndex,
353        event: AnySyncMessageLikeEvent,
354    ) -> Result<(), IndexError> {
355        if let AnySyncMessageLikeEvent::RoomMessage(ev) = event
356            && let Some(ev) = ev.as_original()
357            && ev.content.relates_to.is_none()
358        {
359            return index.execute(RoomIndexOperation::Add(ev.clone()));
360        }
361        panic!("Event was not a relationless OriginalSyncRoomMessageEvent.")
362    }
363
364    /// Helper function to remove events to the index
365    fn index_remove(index: &mut RoomIndex, event_id: &EventId) -> Result<(), IndexError> {
366        index.execute(RoomIndexOperation::Remove(event_id.to_owned()))
367    }
368
369    /// Helper function to edit events in index
370    ///
371    /// Edit event with `event_id` into new [`OriginalSyncRoomMessageEvent`]
372    fn index_edit(
373        index: &mut RoomIndex,
374        event_id: &EventId,
375        new: OriginalSyncRoomMessageEvent,
376    ) -> Result<(), IndexError> {
377        index.execute(RoomIndexOperation::Edit(event_id.to_owned(), new))
378    }
379
380    #[test]
381    fn test_add_event() {
382        let room_id = room_id!("!room_id:localhost");
383        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
384
385        let event = EventFactory::new()
386            .text_msg("event message")
387            .event_id(event_id!("$event_id:localhost"))
388            .room(room_id)
389            .sender(user_id!("@user_id:localhost"))
390            .into_any_sync_message_like_event();
391
392        index_message(&mut index, event).expect("failed to add event: {res:?}");
393    }
394
395    #[test]
396    fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
397        let room_id = room_id!("!room_id:localhost");
398        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
399
400        let event_id_1 = event_id!("$event_id_1:localhost");
401        let event_id_2 = event_id!("$event_id_2:localhost");
402        let event_id_3 = event_id!("$event_id_3:localhost");
403        let user_id = user_id!("@user_id:localhost");
404        let f = EventFactory::new().room(room_id).sender(user_id);
405
406        index_message(
407            &mut index,
408            f.text_msg("This is a sentence")
409                .event_id(event_id_1)
410                .into_any_sync_message_like_event(),
411        )?;
412
413        index_message(
414            &mut index,
415            f.text_msg("All new words").event_id(event_id_2).into_any_sync_message_like_event(),
416        )?;
417
418        index_message(
419            &mut index,
420            f.text_msg("A similar sentence")
421                .event_id(event_id_3)
422                .into_any_sync_message_like_event(),
423        )?;
424
425        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
426        let result: HashSet<_> = result.iter().collect();
427
428        let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
429        let true_value: HashSet<_> = true_value.iter().collect();
430
431        assert_eq!(result, true_value, "search result not correct: {result:?}");
432
433        Ok(())
434    }
435
436    #[test]
437    fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
438        let room_id = room_id!("!room_id:localhost");
439        let index = RoomIndexBuilder::new_in_memory(room_id).build();
440
441        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
442
443        assert!(result.is_empty(), "search result not empty: {result:?}");
444
445        Ok(())
446    }
447
448    #[test]
449    fn test_index_contains_false() {
450        let room_id = room_id!("!room_id:localhost");
451        let index = RoomIndexBuilder::new_in_memory(room_id).build();
452
453        let event_id = event_id!("$event_id:localhost");
454
455        assert!(!index.contains(event_id), "Index should not contain event");
456    }
457
458    #[test]
459    fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
460        let room_id = room_id!("!room_id:localhost");
461        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
462
463        let event_id = event_id!("$event_id:localhost");
464        let event = EventFactory::new()
465            .text_msg("This is a sentence")
466            .event_id(event_id)
467            .room(room_id)
468            .sender(user_id!("@user_id:localhost"))
469            .into_any_sync_message_like_event();
470
471        index_message(&mut index, event)?;
472
473        assert!(index.contains(event_id), "Index should contain event");
474
475        Ok(())
476    }
477
478    #[test]
479    fn test_index_add_idempotency() -> Result<(), Box<dyn Error>> {
480        let room_id = room_id!("!room_id:localhost");
481        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
482
483        let event_id = event_id!("$event_id:localhost");
484        let event = EventFactory::new()
485            .text_msg("This is a sentence")
486            .event_id(event_id)
487            .room(room_id)
488            .sender(user_id!("@user_id:localhost"))
489            .into_any_sync_message_like_event();
490
491        index_message(&mut index, event.clone())?;
492
493        assert!(index.contains(event_id), "Index should contain event");
494
495        // indexing again should do nothing
496        index_message(&mut index, event)?;
497
498        assert!(index.contains(event_id), "Index should still contain event");
499
500        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
501
502        assert_eq!(result.len(), 1, "Index should have ignored second indexing");
503
504        Ok(())
505    }
506
507    #[test]
508    fn test_remove_event() -> Result<(), Box<dyn Error>> {
509        let room_id = room_id!("!room_id:localhost");
510        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
511
512        let event_id = event_id!("$event_id:localhost");
513        let user_id = user_id!("@user_id:localhost");
514        let f = EventFactory::new().room(room_id).sender(user_id);
515
516        let event =
517            f.text_msg("This is a sentence").event_id(event_id).into_any_sync_message_like_event();
518
519        index_message(&mut index, event)?;
520
521        assert!(index.contains(event_id), "Index should contain event");
522
523        index_remove(&mut index, event_id)?;
524
525        assert!(!index.contains(event_id), "Index should not contain event");
526
527        Ok(())
528    }
529
530    #[test]
531    fn test_edit_removes_old_and_adds_new_event() -> Result<(), Box<dyn Error>> {
532        let room_id = room_id!("!room_id:localhost");
533        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
534
535        let old_event_id = event_id!("$old_event_id:localhost");
536        let user_id = user_id!("@user_id:localhost");
537        let f = EventFactory::new().room(room_id).sender(user_id);
538
539        let old_event = f
540            .text_msg("This is a sentence")
541            .event_id(old_event_id)
542            .into_any_sync_message_like_event();
543
544        index_message(&mut index, old_event)?;
545
546        assert!(index.contains(old_event_id), "Index should contain event");
547
548        let new_event_id = event_id!("$new_event_id:localhost");
549        let edit = f
550            .text_msg("This is a brand new sentence!")
551            .edit(
552                old_event_id,
553                RoomMessageEventContentWithoutRelation::text_plain("This is a brand new sentence!"),
554            )
555            .event_id(new_event_id)
556            .into_original_sync_room_message_event();
557
558        index_edit(&mut index, old_event_id, edit)?;
559
560        assert!(!index.contains(old_event_id), "Index should not contain old event");
561        assert!(index.contains(new_event_id), "Index should contain edited event");
562
563        Ok(())
564    }
565}