1pub mod builder;
17
18use std::{collections::HashSet, fmt};
19
20use ruma::{
21 EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
22};
23use tantivy::{
24 Index, IndexReader, TantivyDocument, collector::TopDocs, directory::error::OpenDirectoryError,
25 query::QueryParser, schema::Value,
26};
27use tracing::{debug, error, warn};
28
29use crate::{
30 OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
31 error::IndexError,
32 schema::{MatrixSearchIndexSchema, RoomMessageSchema},
33 writer::SearchIndexWriter,
34};
35
36#[derive(Debug, Clone)]
38pub enum RoomIndexOperation {
39 Add(OriginalSyncRoomMessageEvent),
41 Remove(OwnedEventId),
44 Edit(OwnedEventId, OriginalSyncRoomMessageEvent),
48 Noop,
50}
51
52pub struct RoomIndex {
55 index: Index,
56 schema: RoomMessageSchema,
57 query_parser: QueryParser,
58 room_id: OwnedRoomId,
59 uncommitted_adds: HashSet<OwnedEventId>,
60 uncommitted_removes: HashSet<OwnedEventId>,
61}
62
63impl fmt::Debug for RoomIndex {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("RoomIndex")
66 .field("schema", &self.schema)
67 .field("room_id", &self.room_id)
68 .finish()
69 }
70}
71
72impl RoomIndex {
73 pub(crate) fn new_with(index: Index, schema: RoomMessageSchema, room_id: &RoomId) -> RoomIndex {
74 let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
75 Self {
76 index,
77 schema,
78 query_parser,
79 room_id: room_id.to_owned(),
80 uncommitted_adds: HashSet::new(),
81 uncommitted_removes: HashSet::new(),
82 }
83 }
84
85 fn get_writer(&self) -> Result<SearchIndexWriter, IndexError> {
87 let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
88 Ok(SearchIndexWriter::new(writer, self.schema.clone()))
89 }
90
91 fn get_reader(&self) -> Result<IndexReader, IndexError> {
93 Ok(self.index.reader_builder().try_into()?)
94 }
95
96 fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
101 let last_commit_opstamp = writer.commit()?; self.uncommitted_adds.clear();
103 self.uncommitted_removes.clear();
104 Ok(last_commit_opstamp)
105 }
106
107 fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
118 debug!(
119 "RoomIndex: committing and reloading: uncommitted: {:?}, {:?}",
120 self.uncommitted_adds, self.uncommitted_removes
121 );
122 let last_commit_opstamp = self.commit(writer)?;
123 self.get_reader()?.reload()?;
124 Ok(last_commit_opstamp)
125 }
126
127 pub fn search(
135 &self,
136 query: &str,
137 max_number_of_results: usize,
138 pagination_offset: Option<usize>,
139 ) -> Result<Vec<OwnedEventId>, IndexError> {
140 let query = self.query_parser.parse_query(query)?;
141 let searcher = self.get_reader()?.searcher();
142
143 let offset = pagination_offset.unwrap_or(0);
144
145 let results = searcher
146 .search(&query, &TopDocs::with_limit(max_number_of_results).and_offset(offset))?;
147 let mut ret: Vec<OwnedEventId> = Vec::new();
148 let pk = self.schema.primary_key();
149
150 for (_score, doc_address) in results {
151 let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
152 match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
153 Some(value) => match OwnedEventId::try_from(value) {
154 Ok(event_id) => ret.push(event_id),
155 Err(err) => error!("error while parsing event_id from search result: {err:?}"),
156 },
157 _ => error!("unexpected value type while searching documents"),
158 }
159 }
160
161 Ok(ret)
162 }
163
164 fn get_events_to_be_removed(
165 &self,
166 event_id: &EventId,
167 ) -> Result<Vec<OwnedEventId>, IndexError> {
168 self.search(
169 format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.deletion_key()))
170 .as_str(),
171 10000,
172 None,
173 )
174 }
175
176 fn add(
177 &mut self,
178 writer: &mut SearchIndexWriter,
179 event: OriginalSyncRoomMessageEvent,
180 ) -> Result<(), IndexError> {
181 if !self.contains(&event.event_id) {
182 writer.add(self.schema.make_doc(event.clone())?)?;
183 }
184 self.uncommitted_removes.remove(&event.event_id);
185 self.uncommitted_adds.insert(event.event_id);
186 Ok(())
187 }
188
189 fn remove(
190 &mut self,
191 writer: &mut SearchIndexWriter,
192 event_id: OwnedEventId,
193 ) -> Result<(), IndexError> {
194 let events = self.get_events_to_be_removed(&event_id)?;
195
196 writer.remove(&event_id);
197
198 for event in events.into_iter() {
199 self.uncommitted_adds.remove(&event);
200 self.uncommitted_removes.insert(event);
201 }
202
203 Ok(())
204 }
205
206 fn execute_impl(
207 &mut self,
208 writer: &mut SearchIndexWriter,
209 operation: &RoomIndexOperation,
210 ) -> Result<(), IndexError> {
211 debug!("INDEX: executing {operation:?}");
212 match operation.clone() {
213 RoomIndexOperation::Add(event) => {
214 self.add(writer, event)?;
215 }
216 RoomIndexOperation::Remove(event_id) => {
217 self.remove(writer, event_id)?;
218 }
219 RoomIndexOperation::Edit(remove_event_id, event) => {
220 self.remove(writer, remove_event_id)?;
221 self.add(writer, event)?;
222 }
223 RoomIndexOperation::Noop => {}
224 }
225 Ok(())
226 }
227
228 fn execute_with_retry(
230 &mut self,
231 writer: &mut SearchIndexWriter,
232 operation: &RoomIndexOperation,
233 retries: usize,
234 ) -> Result<(), IndexError> {
235 let mut num_tries = 0;
236
237 while let Err(err) = self.execute_impl(writer, operation) {
238 if num_tries == retries {
239 return Err(err);
240 }
241 match err {
242 IndexError::TantivyError(_)
244 | IndexError::IndexSchemaError(_)
245 | IndexError::IndexWriteError(_)
246 | IndexError::IO(_) => {
247 num_tries += 1;
248 }
249 IndexError::OpenDirectoryError(ref e) => match e {
250 OpenDirectoryError::IoError { io_error: _, directory_path: _ } => {
252 num_tries += 1;
253 }
254 OpenDirectoryError::DoesNotExist(_)
256 | OpenDirectoryError::FailedToCreateTempDir(_)
257 | OpenDirectoryError::NotADirectory(_) => return Err(err),
258 },
259 IndexError::QueryParserError(_) => return Err(err),
261 IndexError::CannotIndexRedactedMessage
263 | IndexError::EmptyMessage
264 | IndexError::MessageTypeNotSupported => break,
265 }
266 debug!("Failed to execute operation in room index (try {num_tries}): {err}");
267 }
268 Ok(())
269 }
270
271 pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
280 let mut writer = self.get_writer()?;
281 self.execute_with_retry(&mut writer, &operation, 5)?;
282 self.commit_and_reload(&mut writer)?;
283 Ok(())
284 }
285
286 pub fn bulk_execute(&mut self, operations: Vec<RoomIndexOperation>) -> Result<(), IndexError> {
293 let mut writer = self.get_writer()?;
294 let mut operations = operations.into_iter();
295 let mut next_operation = operations.next();
296
297 while let Some(ref operation) = next_operation {
298 self.execute_with_retry(&mut writer, operation, 5)?;
299 next_operation = operations.next();
300 }
301
302 self.commit_and_reload(&mut writer)?;
303
304 Ok(())
305 }
306
307 fn contains(&self, event_id: &EventId) -> bool {
308 let search_result = self.search(
309 format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key()))
310 .as_str(),
311 1,
312 None,
313 );
314 match search_result {
315 Ok(results) => {
316 !self.uncommitted_removes.contains(event_id)
317 && (!results.is_empty() || self.uncommitted_adds.contains(event_id))
318 }
319 Err(err) => {
320 warn!("Failed to check if event has been indexed, assuming it has: {err}");
321 true
322 }
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use std::{collections::HashSet, error::Error};
330
331 use matrix_sdk_test::event_factory::EventFactory;
332 use ruma::{
333 EventId, event_id,
334 events::{
335 AnySyncMessageLikeEvent,
336 room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation},
337 },
338 room_id, user_id,
339 };
340
341 use crate::{
342 error::IndexError,
343 index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
344 };
345
346 fn index_message(
352 index: &mut RoomIndex,
353 event: AnySyncMessageLikeEvent,
354 ) -> Result<(), IndexError> {
355 if let AnySyncMessageLikeEvent::RoomMessage(ev) = event
356 && let Some(ev) = ev.as_original()
357 && ev.content.relates_to.is_none()
358 {
359 return index.execute(RoomIndexOperation::Add(ev.clone()));
360 }
361 panic!("Event was not a relationless OriginalSyncRoomMessageEvent.")
362 }
363
364 fn index_remove(index: &mut RoomIndex, event_id: &EventId) -> Result<(), IndexError> {
366 index.execute(RoomIndexOperation::Remove(event_id.to_owned()))
367 }
368
369 fn index_edit(
373 index: &mut RoomIndex,
374 event_id: &EventId,
375 new: OriginalSyncRoomMessageEvent,
376 ) -> Result<(), IndexError> {
377 index.execute(RoomIndexOperation::Edit(event_id.to_owned(), new))
378 }
379
380 #[test]
381 fn test_add_event() {
382 let room_id = room_id!("!room_id:localhost");
383 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
384
385 let event = EventFactory::new()
386 .text_msg("event message")
387 .event_id(event_id!("$event_id:localhost"))
388 .room(room_id)
389 .sender(user_id!("@user_id:localhost"))
390 .into_any_sync_message_like_event();
391
392 index_message(&mut index, event).expect("failed to add event: {res:?}");
393 }
394
395 #[test]
396 fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
397 let room_id = room_id!("!room_id:localhost");
398 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
399
400 let event_id_1 = event_id!("$event_id_1:localhost");
401 let event_id_2 = event_id!("$event_id_2:localhost");
402 let event_id_3 = event_id!("$event_id_3:localhost");
403 let user_id = user_id!("@user_id:localhost");
404 let f = EventFactory::new().room(room_id).sender(user_id);
405
406 index_message(
407 &mut index,
408 f.text_msg("This is a sentence")
409 .event_id(event_id_1)
410 .into_any_sync_message_like_event(),
411 )?;
412
413 index_message(
414 &mut index,
415 f.text_msg("All new words").event_id(event_id_2).into_any_sync_message_like_event(),
416 )?;
417
418 index_message(
419 &mut index,
420 f.text_msg("A similar sentence")
421 .event_id(event_id_3)
422 .into_any_sync_message_like_event(),
423 )?;
424
425 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
426 let result: HashSet<_> = result.iter().collect();
427
428 let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
429 let true_value: HashSet<_> = true_value.iter().collect();
430
431 assert_eq!(result, true_value, "search result not correct: {result:?}");
432
433 Ok(())
434 }
435
436 #[test]
437 fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
438 let room_id = room_id!("!room_id:localhost");
439 let index = RoomIndexBuilder::new_in_memory(room_id).build();
440
441 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
442
443 assert!(result.is_empty(), "search result not empty: {result:?}");
444
445 Ok(())
446 }
447
448 #[test]
449 fn test_index_contains_false() {
450 let room_id = room_id!("!room_id:localhost");
451 let index = RoomIndexBuilder::new_in_memory(room_id).build();
452
453 let event_id = event_id!("$event_id:localhost");
454
455 assert!(!index.contains(event_id), "Index should not contain event");
456 }
457
458 #[test]
459 fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
460 let room_id = room_id!("!room_id:localhost");
461 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
462
463 let event_id = event_id!("$event_id:localhost");
464 let event = EventFactory::new()
465 .text_msg("This is a sentence")
466 .event_id(event_id)
467 .room(room_id)
468 .sender(user_id!("@user_id:localhost"))
469 .into_any_sync_message_like_event();
470
471 index_message(&mut index, event)?;
472
473 assert!(index.contains(event_id), "Index should contain event");
474
475 Ok(())
476 }
477
478 #[test]
479 fn test_index_add_idempotency() -> Result<(), Box<dyn Error>> {
480 let room_id = room_id!("!room_id:localhost");
481 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
482
483 let event_id = event_id!("$event_id:localhost");
484 let event = EventFactory::new()
485 .text_msg("This is a sentence")
486 .event_id(event_id)
487 .room(room_id)
488 .sender(user_id!("@user_id:localhost"))
489 .into_any_sync_message_like_event();
490
491 index_message(&mut index, event.clone())?;
492
493 assert!(index.contains(event_id), "Index should contain event");
494
495 index_message(&mut index, event)?;
497
498 assert!(index.contains(event_id), "Index should still contain event");
499
500 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
501
502 assert_eq!(result.len(), 1, "Index should have ignored second indexing");
503
504 Ok(())
505 }
506
507 #[test]
508 fn test_remove_event() -> Result<(), Box<dyn Error>> {
509 let room_id = room_id!("!room_id:localhost");
510 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
511
512 let event_id = event_id!("$event_id:localhost");
513 let user_id = user_id!("@user_id:localhost");
514 let f = EventFactory::new().room(room_id).sender(user_id);
515
516 let event =
517 f.text_msg("This is a sentence").event_id(event_id).into_any_sync_message_like_event();
518
519 index_message(&mut index, event)?;
520
521 assert!(index.contains(event_id), "Index should contain event");
522
523 index_remove(&mut index, event_id)?;
524
525 assert!(!index.contains(event_id), "Index should not contain event");
526
527 Ok(())
528 }
529
530 #[test]
531 fn test_edit_removes_old_and_adds_new_event() -> Result<(), Box<dyn Error>> {
532 let room_id = room_id!("!room_id:localhost");
533 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
534
535 let old_event_id = event_id!("$old_event_id:localhost");
536 let user_id = user_id!("@user_id:localhost");
537 let f = EventFactory::new().room(room_id).sender(user_id);
538
539 let old_event = f
540 .text_msg("This is a sentence")
541 .event_id(old_event_id)
542 .into_any_sync_message_like_event();
543
544 index_message(&mut index, old_event)?;
545
546 assert!(index.contains(old_event_id), "Index should contain event");
547
548 let new_event_id = event_id!("$new_event_id:localhost");
549 let edit = f
550 .text_msg("This is a brand new sentence!")
551 .edit(
552 old_event_id,
553 RoomMessageEventContentWithoutRelation::text_plain("This is a brand new sentence!"),
554 )
555 .event_id(new_event_id)
556 .into_original_sync_room_message_event();
557
558 index_edit(&mut index, old_event_id, edit)?;
559
560 assert!(!index.contains(old_event_id), "Index should not contain old event");
561 assert!(index.contains(new_event_id), "Index should contain edited event");
562
563 Ok(())
564 }
565}