Skip to main content

matrix_sdk_search/index/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/// A module for building a [`RoomIndex`]
16pub mod builder;
17
18use std::{collections::HashSet, fmt};
19
20use ruma::{
21    EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
22};
23use tantivy::{
24    Index, IndexReader, TantivyDocument, collector::TopDocs, directory::error::OpenDirectoryError,
25    query::QueryParser, schema::Value,
26};
27use tracing::{debug, error, warn};
28
29use crate::{
30    OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
31    error::IndexError,
32    schema::{MatrixSearchIndexSchema, RoomMessageSchema},
33    writer::SearchIndexWriter,
34};
35
36/// A struct to represent the operations on a [`RoomIndex`]
37#[derive(Debug, Clone)]
38pub enum RoomIndexOperation {
39    /// Add this event to the index.
40    Add(OriginalSyncRoomMessageEvent),
41    /// Remove all documents in the index where
42    /// `MatrixSearchIndexSchema::deletion_key()` matches this event id.
43    Remove(OwnedEventId),
44    /// Replace all documents in the index where
45    /// `MatrixSearchIndexSchema::deletion_key()` matches this event id with
46    /// the new event.
47    Edit(OwnedEventId, OriginalSyncRoomMessageEvent),
48    /// Do nothing.
49    Noop,
50}
51
52/// A struct that holds all data pertaining to a particular room's
53/// message index.
54pub struct RoomIndex {
55    index: Index,
56    schema: RoomMessageSchema,
57    query_parser: QueryParser,
58    room_id: OwnedRoomId,
59    uncommitted_adds: HashSet<OwnedEventId>,
60    uncommitted_removes: HashSet<OwnedEventId>,
61}
62
63impl fmt::Debug for RoomIndex {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("RoomIndex")
66            .field("schema", &self.schema)
67            .field("room_id", &self.room_id)
68            .finish()
69    }
70}
71
72impl RoomIndex {
73    pub(crate) fn new_with(index: Index, schema: RoomMessageSchema, room_id: &RoomId) -> RoomIndex {
74        let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
75        Self {
76            index,
77            schema,
78            query_parser,
79            room_id: room_id.to_owned(),
80            uncommitted_adds: HashSet::new(),
81            uncommitted_removes: HashSet::new(),
82        }
83    }
84
85    /// Get a [`SearchIndexWriter`] for this index.
86    fn get_writer(&self) -> Result<SearchIndexWriter, IndexError> {
87        let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
88        Ok(SearchIndexWriter::new(writer, self.schema.clone()))
89    }
90
91    /// Get a [`IndexReader`] for this index.
92    fn get_reader(&self) -> Result<IndexReader, IndexError> {
93        Ok(self.index.reader_builder().try_into()?)
94    }
95
96    /// Commit added events to [`RoomIndex`]. The changes are not reflected in
97    /// the search results until the serchers are reloaded.
98    ///
99    /// Use [`RoomIndex::commit_and_reload`] for this purpose.
100    fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
101        let last_commit_opstamp = writer.commit()?; // TODO: This is blocking. Handle it.
102        self.uncommitted_adds.clear();
103        self.uncommitted_removes.clear();
104        Ok(last_commit_opstamp)
105    }
106
107    /// Commit added events to [`RoomIndex`] and
108    /// update searchers so that they reflect the state of the last
109    /// `.commit()`.
110    ///
111    /// Every commit should be rapidly reflected on your `IndexReader` and you
112    /// should not need to call `reload()` at all.
113    ///
114    /// This automatic reload can take 10s of milliseconds to kick in however,
115    /// and in unit tests it can be nice to deterministically force the
116    /// reload of searchers.
117    fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
118        debug!(
119            "RoomIndex: committing and reloading: uncommitted: {:?}, {:?}",
120            self.uncommitted_adds, self.uncommitted_removes
121        );
122        let last_commit_opstamp = self.commit(writer)?;
123        self.get_reader()?.reload()?;
124        Ok(last_commit_opstamp)
125    }
126
127    /// Search the [`RoomIndex`] for some query. Returns a list of
128    /// results with a maximum given length. If `pagination_offset` is
129    /// set then the results will start there, i.e.
130    ///
131    /// if `max_number_of_results = 3` and `pagination_offset = 10`
132    /// (and there are a surplus of results)
133    /// then this will return results `11, 12, 13`
134    pub fn search(
135        &self,
136        query: &str,
137        max_number_of_results: usize,
138        pagination_offset: Option<usize>,
139    ) -> Result<Vec<(f32, OwnedEventId)>, IndexError> {
140        let query = self.query_parser.parse_query(query)?;
141        let searcher = self.get_reader()?.searcher();
142
143        let offset = pagination_offset.unwrap_or(0);
144
145        let results = searcher.search(
146            &query,
147            &TopDocs::with_limit(max_number_of_results).and_offset(offset).order_by_score(),
148        )?;
149        let mut ret: Vec<(f32, OwnedEventId)> = Vec::new();
150        let pk = self.schema.primary_key();
151
152        for (score, doc_address) in results {
153            let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
154            match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
155                Some(value) => match OwnedEventId::try_from(value) {
156                    Ok(event_id) => ret.push((score, event_id)),
157                    Err(err) => error!("error while parsing event_id from search result: {err:?}"),
158                },
159                _ => error!("unexpected value type while searching documents"),
160            }
161        }
162
163        Ok(ret)
164    }
165
166    fn get_events_to_be_removed(
167        &self,
168        event_id: &EventId,
169    ) -> Result<Vec<OwnedEventId>, IndexError> {
170        Ok(self
171            .search(
172                format!(
173                    "{}:\"{event_id}\"",
174                    self.schema.get_field_name(self.schema.deletion_key())
175                )
176                .as_str(),
177                10000,
178                None,
179            )?
180            .into_iter()
181            .map(|(_, id)| id)
182            .collect())
183    }
184
185    fn add(
186        &mut self,
187        writer: &mut SearchIndexWriter,
188        event: OriginalSyncRoomMessageEvent,
189    ) -> Result<(), IndexError> {
190        if !self.contains(&event.event_id) {
191            writer.add(self.schema.make_doc(event.clone())?)?;
192        }
193        self.uncommitted_removes.remove(&event.event_id);
194        self.uncommitted_adds.insert(event.event_id);
195        Ok(())
196    }
197
198    fn remove(
199        &mut self,
200        writer: &mut SearchIndexWriter,
201        event_id: OwnedEventId,
202    ) -> Result<(), IndexError> {
203        let events = self.get_events_to_be_removed(&event_id)?;
204
205        writer.remove(&event_id);
206
207        for event in events.into_iter() {
208            self.uncommitted_adds.remove(&event);
209            self.uncommitted_removes.insert(event);
210        }
211
212        Ok(())
213    }
214
215    fn execute_impl(
216        &mut self,
217        writer: &mut SearchIndexWriter,
218        operation: &RoomIndexOperation,
219    ) -> Result<(), IndexError> {
220        debug!("INDEX: executing {operation:?}");
221        match operation.clone() {
222            RoomIndexOperation::Add(event) => {
223                self.add(writer, event)?;
224            }
225            RoomIndexOperation::Remove(event_id) => {
226                self.remove(writer, event_id)?;
227            }
228            RoomIndexOperation::Edit(remove_event_id, event) => {
229                self.remove(writer, remove_event_id)?;
230                self.add(writer, event)?;
231            }
232            RoomIndexOperation::Noop => {}
233        }
234        Ok(())
235    }
236
237    /// Execute [`RoomIndexOperation`] with retry
238    fn execute_with_retry(
239        &mut self,
240        writer: &mut SearchIndexWriter,
241        operation: &RoomIndexOperation,
242        retries: usize,
243    ) -> Result<(), IndexError> {
244        let mut num_tries = 0;
245
246        while let Err(err) = self.execute_impl(writer, operation) {
247            if num_tries == retries {
248                return Err(err);
249            }
250            match err {
251                // Retry
252                IndexError::TantivyError(_)
253                | IndexError::IndexSchemaError(_)
254                | IndexError::IndexWriteError(_)
255                | IndexError::IO(_) => {
256                    num_tries += 1;
257                }
258                IndexError::OpenDirectoryError(ref e) => match e {
259                    // Retry
260                    OpenDirectoryError::IoError { io_error: _, directory_path: _ } => {
261                        num_tries += 1;
262                    }
263                    // Bubble
264                    OpenDirectoryError::DoesNotExist(_)
265                    | OpenDirectoryError::FailedToCreateTempDir(_)
266                    | OpenDirectoryError::NotADirectory(_) => return Err(err),
267                },
268                // Bubble
269                IndexError::QueryParserError(_) => return Err(err),
270                // Ignore
271                IndexError::CannotIndexRedactedMessage
272                | IndexError::EmptyMessage
273                | IndexError::MessageTypeNotSupported => break,
274            }
275            debug!("Failed to execute operation in room index (try {num_tries}): {err}");
276        }
277        Ok(())
278    }
279
280    /// Execute [`RoomIndexOperation`]
281    ///
282    /// If an error occurs, retry 5 times if possible.
283    ///
284    /// This which will add/remove/edit an event in the index based on the
285    /// operation.
286    ///
287    /// Prefer [`RoomIndex::bulk_execute`] for multiple operations.
288    pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
289        let mut writer = self.get_writer()?;
290        self.execute_with_retry(&mut writer, &operation, 5)?;
291        self.commit_and_reload(&mut writer)?;
292        Ok(())
293    }
294
295    /// Bulk execute [`RoomIndexOperation`]s
296    ///
297    /// If an error occurs in the batch it retries 5 times if possible.
298    ///
299    /// This which will add/remove/edit an events in the index based on the
300    /// operations.
301    pub fn bulk_execute(&mut self, operations: Vec<RoomIndexOperation>) -> Result<(), IndexError> {
302        let mut writer = self.get_writer()?;
303        let mut operations = operations.into_iter();
304        let mut next_operation = operations.next();
305
306        while let Some(ref operation) = next_operation {
307            self.execute_with_retry(&mut writer, operation, 5)?;
308            next_operation = operations.next();
309        }
310
311        self.commit_and_reload(&mut writer)?;
312
313        Ok(())
314    }
315
316    fn contains(&self, event_id: &EventId) -> bool {
317        let search_result = self.search(
318            format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key()))
319                .as_str(),
320            1,
321            None,
322        );
323        match search_result {
324            Ok(results) => {
325                !self.uncommitted_removes.contains(event_id)
326                    && (!results.is_empty() || self.uncommitted_adds.contains(event_id))
327            }
328            Err(err) => {
329                warn!("Failed to check if event has been indexed, assuming it has: {err}");
330                true
331            }
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::{collections::HashSet, error::Error};
339
340    use matrix_sdk_test::event_factory::EventFactory;
341    use ruma::{
342        EventId, event_id,
343        events::{
344            AnySyncMessageLikeEvent,
345            room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation},
346        },
347        room_id, user_id,
348    };
349
350    use crate::{
351        error::IndexError,
352        index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
353    };
354
355    /// Helper function to add a regular message to the index
356    ///
357    /// # Panic
358    /// Panics when event is not a [`OriginalSyncRoomMessageEvent`] with no
359    /// relations.
360    fn index_message(
361        index: &mut RoomIndex,
362        event: AnySyncMessageLikeEvent,
363    ) -> Result<(), IndexError> {
364        if let AnySyncMessageLikeEvent::RoomMessage(ev) = event
365            && let Some(ev) = ev.as_original()
366            && ev.content.relates_to.is_none()
367        {
368            return index.execute(RoomIndexOperation::Add(ev.clone()));
369        }
370        panic!("Event was not a relationless OriginalSyncRoomMessageEvent.")
371    }
372
373    /// Helper function to remove events to the index
374    fn index_remove(index: &mut RoomIndex, event_id: &EventId) -> Result<(), IndexError> {
375        index.execute(RoomIndexOperation::Remove(event_id.to_owned()))
376    }
377
378    /// Helper function to edit events in index
379    ///
380    /// Edit event with `event_id` into new [`OriginalSyncRoomMessageEvent`]
381    fn index_edit(
382        index: &mut RoomIndex,
383        event_id: &EventId,
384        new: OriginalSyncRoomMessageEvent,
385    ) -> Result<(), IndexError> {
386        index.execute(RoomIndexOperation::Edit(event_id.to_owned(), new))
387    }
388
389    #[test]
390    fn test_add_event() {
391        let room_id = room_id!("!room_id:localhost");
392        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
393
394        let event = EventFactory::new()
395            .text_msg("event message")
396            .event_id(event_id!("$event_id:localhost"))
397            .room(room_id)
398            .sender(user_id!("@user_id:localhost"))
399            .into_any_sync_message_like_event();
400
401        index_message(&mut index, event).expect("failed to add event: {res:?}");
402    }
403
404    #[test]
405    fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
406        let room_id = room_id!("!room_id:localhost");
407        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
408
409        let event_id_1 = event_id!("$event_id_1:localhost");
410        let event_id_2 = event_id!("$event_id_2:localhost");
411        let event_id_3 = event_id!("$event_id_3:localhost");
412        let user_id = user_id!("@user_id:localhost");
413        let f = EventFactory::new().room(room_id).sender(user_id);
414
415        index_message(
416            &mut index,
417            f.text_msg("This is a sentence")
418                .event_id(event_id_1)
419                .into_any_sync_message_like_event(),
420        )?;
421
422        index_message(
423            &mut index,
424            f.text_msg("All new words").event_id(event_id_2).into_any_sync_message_like_event(),
425        )?;
426
427        index_message(
428            &mut index,
429            f.text_msg("A similar sentence")
430                .event_id(event_id_3)
431                .into_any_sync_message_like_event(),
432        )?;
433
434        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
435        let result: HashSet<_> = result.iter().map(|(_, id)| id).collect();
436
437        let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
438        let true_value: HashSet<_> = true_value.iter().collect();
439
440        assert_eq!(result, true_value, "search result not correct: {result:?}");
441
442        Ok(())
443    }
444
445    #[test]
446    fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
447        let room_id = room_id!("!room_id:localhost");
448        let index = RoomIndexBuilder::new_in_memory(room_id).build();
449
450        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
451
452        assert!(result.is_empty(), "search result not empty: {result:?}");
453
454        Ok(())
455    }
456
457    #[test]
458    fn test_index_contains_false() {
459        let room_id = room_id!("!room_id:localhost");
460        let index = RoomIndexBuilder::new_in_memory(room_id).build();
461
462        let event_id = event_id!("$event_id:localhost");
463
464        assert!(!index.contains(event_id), "Index should not contain event");
465    }
466
467    #[test]
468    fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
469        let room_id = room_id!("!room_id:localhost");
470        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
471
472        let event_id = event_id!("$event_id:localhost");
473        let event = EventFactory::new()
474            .text_msg("This is a sentence")
475            .event_id(event_id)
476            .room(room_id)
477            .sender(user_id!("@user_id:localhost"))
478            .into_any_sync_message_like_event();
479
480        index_message(&mut index, event)?;
481
482        assert!(index.contains(event_id), "Index should contain event");
483
484        Ok(())
485    }
486
487    #[test]
488    fn test_index_add_idempotency() -> Result<(), Box<dyn Error>> {
489        let room_id = room_id!("!room_id:localhost");
490        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
491
492        let event_id = event_id!("$event_id:localhost");
493        let event = EventFactory::new()
494            .text_msg("This is a sentence")
495            .event_id(event_id)
496            .room(room_id)
497            .sender(user_id!("@user_id:localhost"))
498            .into_any_sync_message_like_event();
499
500        index_message(&mut index, event.clone())?;
501
502        assert!(index.contains(event_id), "Index should contain event");
503
504        // indexing again should do nothing
505        index_message(&mut index, event)?;
506
507        assert!(index.contains(event_id), "Index should still contain event");
508
509        let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
510
511        assert_eq!(result.len(), 1, "Index should have ignored second indexing");
512
513        Ok(())
514    }
515
516    #[test]
517    fn test_remove_event() -> Result<(), Box<dyn Error>> {
518        let room_id = room_id!("!room_id:localhost");
519        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
520
521        let event_id = event_id!("$event_id:localhost");
522        let user_id = user_id!("@user_id:localhost");
523        let f = EventFactory::new().room(room_id).sender(user_id);
524
525        let event =
526            f.text_msg("This is a sentence").event_id(event_id).into_any_sync_message_like_event();
527
528        index_message(&mut index, event)?;
529
530        assert!(index.contains(event_id), "Index should contain event");
531
532        index_remove(&mut index, event_id)?;
533
534        assert!(!index.contains(event_id), "Index should not contain event");
535
536        Ok(())
537    }
538
539    #[test]
540    fn test_edit_removes_old_and_adds_new_event() -> Result<(), Box<dyn Error>> {
541        let room_id = room_id!("!room_id:localhost");
542        let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
543
544        let old_event_id = event_id!("$old_event_id:localhost");
545        let user_id = user_id!("@user_id:localhost");
546        let f = EventFactory::new().room(room_id).sender(user_id);
547
548        let old_event = f
549            .text_msg("This is a sentence")
550            .event_id(old_event_id)
551            .into_any_sync_message_like_event();
552
553        index_message(&mut index, old_event)?;
554
555        assert!(index.contains(old_event_id), "Index should contain event");
556
557        let new_event_id = event_id!("$new_event_id:localhost");
558        let edit = f
559            .text_msg("This is a brand new sentence!")
560            .edit(
561                old_event_id,
562                RoomMessageEventContentWithoutRelation::text_plain("This is a brand new sentence!"),
563            )
564            .event_id(new_event_id)
565            .into_original_sync_room_message_event();
566
567        index_edit(&mut index, old_event_id, edit)?;
568
569        assert!(!index.contains(old_event_id), "Index should not contain old event");
570        assert!(index.contains(new_event_id), "Index should contain edited event");
571
572        Ok(())
573    }
574}