1pub mod builder;
17
18use std::{collections::HashSet, fmt};
19
20use ruma::{
21 EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::message::OriginalSyncRoomMessageEvent,
22};
23use tantivy::{
24 Index, IndexReader, TantivyDocument, collector::TopDocs, directory::error::OpenDirectoryError,
25 query::QueryParser, schema::Value,
26};
27use tracing::{debug, error, warn};
28
29use crate::{
30 OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
31 error::IndexError,
32 schema::{MatrixSearchIndexSchema, RoomMessageSchema},
33 writer::SearchIndexWriter,
34};
35
36#[derive(Debug, Clone)]
38pub enum RoomIndexOperation {
39 Add(OriginalSyncRoomMessageEvent),
41 Remove(OwnedEventId),
44 Edit(OwnedEventId, OriginalSyncRoomMessageEvent),
48 Noop,
50}
51
52pub struct RoomIndex {
55 index: Index,
56 schema: RoomMessageSchema,
57 query_parser: QueryParser,
58 room_id: OwnedRoomId,
59 uncommitted_adds: HashSet<OwnedEventId>,
60 uncommitted_removes: HashSet<OwnedEventId>,
61}
62
63impl fmt::Debug for RoomIndex {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("RoomIndex")
66 .field("schema", &self.schema)
67 .field("room_id", &self.room_id)
68 .finish()
69 }
70}
71
72impl RoomIndex {
73 pub(crate) fn new_with(index: Index, schema: RoomMessageSchema, room_id: &RoomId) -> RoomIndex {
74 let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
75 Self {
76 index,
77 schema,
78 query_parser,
79 room_id: room_id.to_owned(),
80 uncommitted_adds: HashSet::new(),
81 uncommitted_removes: HashSet::new(),
82 }
83 }
84
85 fn get_writer(&self) -> Result<SearchIndexWriter, IndexError> {
87 let writer = self.index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
88 Ok(SearchIndexWriter::new(writer, self.schema.clone()))
89 }
90
91 fn get_reader(&self) -> Result<IndexReader, IndexError> {
93 Ok(self.index.reader_builder().try_into()?)
94 }
95
96 fn commit(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
101 let last_commit_opstamp = writer.commit()?; self.uncommitted_adds.clear();
103 self.uncommitted_removes.clear();
104 Ok(last_commit_opstamp)
105 }
106
107 fn commit_and_reload(&mut self, writer: &mut SearchIndexWriter) -> Result<OpStamp, IndexError> {
118 debug!(
119 "RoomIndex: committing and reloading: uncommitted: {:?}, {:?}",
120 self.uncommitted_adds, self.uncommitted_removes
121 );
122 let last_commit_opstamp = self.commit(writer)?;
123 self.get_reader()?.reload()?;
124 Ok(last_commit_opstamp)
125 }
126
127 pub fn search(
135 &self,
136 query: &str,
137 max_number_of_results: usize,
138 pagination_offset: Option<usize>,
139 ) -> Result<Vec<(f32, OwnedEventId)>, IndexError> {
140 let query = self.query_parser.parse_query(query)?;
141 let searcher = self.get_reader()?.searcher();
142
143 let offset = pagination_offset.unwrap_or(0);
144
145 let results = searcher.search(
146 &query,
147 &TopDocs::with_limit(max_number_of_results).and_offset(offset).order_by_score(),
148 )?;
149 let mut ret: Vec<(f32, OwnedEventId)> = Vec::new();
150 let pk = self.schema.primary_key();
151
152 for (score, doc_address) in results {
153 let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
154 match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
155 Some(value) => match OwnedEventId::try_from(value) {
156 Ok(event_id) => ret.push((score, event_id)),
157 Err(err) => error!("error while parsing event_id from search result: {err:?}"),
158 },
159 _ => error!("unexpected value type while searching documents"),
160 }
161 }
162
163 Ok(ret)
164 }
165
166 fn get_events_to_be_removed(
167 &self,
168 event_id: &EventId,
169 ) -> Result<Vec<OwnedEventId>, IndexError> {
170 Ok(self
171 .search(
172 format!(
173 "{}:\"{event_id}\"",
174 self.schema.get_field_name(self.schema.deletion_key())
175 )
176 .as_str(),
177 10000,
178 None,
179 )?
180 .into_iter()
181 .map(|(_, id)| id)
182 .collect())
183 }
184
185 fn add(
186 &mut self,
187 writer: &mut SearchIndexWriter,
188 event: OriginalSyncRoomMessageEvent,
189 ) -> Result<(), IndexError> {
190 if !self.contains(&event.event_id) {
191 writer.add(self.schema.make_doc(event.clone())?)?;
192 }
193 self.uncommitted_removes.remove(&event.event_id);
194 self.uncommitted_adds.insert(event.event_id);
195 Ok(())
196 }
197
198 fn remove(
199 &mut self,
200 writer: &mut SearchIndexWriter,
201 event_id: OwnedEventId,
202 ) -> Result<(), IndexError> {
203 let events = self.get_events_to_be_removed(&event_id)?;
204
205 writer.remove(&event_id);
206
207 for event in events.into_iter() {
208 self.uncommitted_adds.remove(&event);
209 self.uncommitted_removes.insert(event);
210 }
211
212 Ok(())
213 }
214
215 fn execute_impl(
216 &mut self,
217 writer: &mut SearchIndexWriter,
218 operation: &RoomIndexOperation,
219 ) -> Result<(), IndexError> {
220 debug!("INDEX: executing {operation:?}");
221 match operation.clone() {
222 RoomIndexOperation::Add(event) => {
223 self.add(writer, event)?;
224 }
225 RoomIndexOperation::Remove(event_id) => {
226 self.remove(writer, event_id)?;
227 }
228 RoomIndexOperation::Edit(remove_event_id, event) => {
229 self.remove(writer, remove_event_id)?;
230 self.add(writer, event)?;
231 }
232 RoomIndexOperation::Noop => {}
233 }
234 Ok(())
235 }
236
237 fn execute_with_retry(
239 &mut self,
240 writer: &mut SearchIndexWriter,
241 operation: &RoomIndexOperation,
242 retries: usize,
243 ) -> Result<(), IndexError> {
244 let mut num_tries = 0;
245
246 while let Err(err) = self.execute_impl(writer, operation) {
247 if num_tries == retries {
248 return Err(err);
249 }
250 match err {
251 IndexError::TantivyError(_)
253 | IndexError::IndexSchemaError(_)
254 | IndexError::IndexWriteError(_)
255 | IndexError::IO(_) => {
256 num_tries += 1;
257 }
258 IndexError::OpenDirectoryError(ref e) => match e {
259 OpenDirectoryError::IoError { io_error: _, directory_path: _ } => {
261 num_tries += 1;
262 }
263 OpenDirectoryError::DoesNotExist(_)
265 | OpenDirectoryError::FailedToCreateTempDir(_)
266 | OpenDirectoryError::NotADirectory(_) => return Err(err),
267 },
268 IndexError::QueryParserError(_) => return Err(err),
270 IndexError::CannotIndexRedactedMessage
272 | IndexError::EmptyMessage
273 | IndexError::MessageTypeNotSupported => break,
274 }
275 debug!("Failed to execute operation in room index (try {num_tries}): {err}");
276 }
277 Ok(())
278 }
279
280 pub fn execute(&mut self, operation: RoomIndexOperation) -> Result<(), IndexError> {
289 let mut writer = self.get_writer()?;
290 self.execute_with_retry(&mut writer, &operation, 5)?;
291 self.commit_and_reload(&mut writer)?;
292 Ok(())
293 }
294
295 pub fn bulk_execute(&mut self, operations: Vec<RoomIndexOperation>) -> Result<(), IndexError> {
302 let mut writer = self.get_writer()?;
303 let mut operations = operations.into_iter();
304 let mut next_operation = operations.next();
305
306 while let Some(ref operation) = next_operation {
307 self.execute_with_retry(&mut writer, operation, 5)?;
308 next_operation = operations.next();
309 }
310
311 self.commit_and_reload(&mut writer)?;
312
313 Ok(())
314 }
315
316 fn contains(&self, event_id: &EventId) -> bool {
317 let search_result = self.search(
318 format!("{}:\"{event_id}\"", self.schema.get_field_name(self.schema.primary_key()))
319 .as_str(),
320 1,
321 None,
322 );
323 match search_result {
324 Ok(results) => {
325 !self.uncommitted_removes.contains(event_id)
326 && (!results.is_empty() || self.uncommitted_adds.contains(event_id))
327 }
328 Err(err) => {
329 warn!("Failed to check if event has been indexed, assuming it has: {err}");
330 true
331 }
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::{collections::HashSet, error::Error};
339
340 use matrix_sdk_test::event_factory::EventFactory;
341 use ruma::{
342 EventId, event_id,
343 events::{
344 AnySyncMessageLikeEvent,
345 room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation},
346 },
347 room_id, user_id,
348 };
349
350 use crate::{
351 error::IndexError,
352 index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
353 };
354
355 fn index_message(
361 index: &mut RoomIndex,
362 event: AnySyncMessageLikeEvent,
363 ) -> Result<(), IndexError> {
364 if let AnySyncMessageLikeEvent::RoomMessage(ev) = event
365 && let Some(ev) = ev.as_original()
366 && ev.content.relates_to.is_none()
367 {
368 return index.execute(RoomIndexOperation::Add(ev.clone()));
369 }
370 panic!("Event was not a relationless OriginalSyncRoomMessageEvent.")
371 }
372
373 fn index_remove(index: &mut RoomIndex, event_id: &EventId) -> Result<(), IndexError> {
375 index.execute(RoomIndexOperation::Remove(event_id.to_owned()))
376 }
377
378 fn index_edit(
382 index: &mut RoomIndex,
383 event_id: &EventId,
384 new: OriginalSyncRoomMessageEvent,
385 ) -> Result<(), IndexError> {
386 index.execute(RoomIndexOperation::Edit(event_id.to_owned(), new))
387 }
388
389 #[test]
390 fn test_add_event() {
391 let room_id = room_id!("!room_id:localhost");
392 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
393
394 let event = EventFactory::new()
395 .text_msg("event message")
396 .event_id(event_id!("$event_id:localhost"))
397 .room(room_id)
398 .sender(user_id!("@user_id:localhost"))
399 .into_any_sync_message_like_event();
400
401 index_message(&mut index, event).expect("failed to add event: {res:?}");
402 }
403
404 #[test]
405 fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
406 let room_id = room_id!("!room_id:localhost");
407 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
408
409 let event_id_1 = event_id!("$event_id_1:localhost");
410 let event_id_2 = event_id!("$event_id_2:localhost");
411 let event_id_3 = event_id!("$event_id_3:localhost");
412 let user_id = user_id!("@user_id:localhost");
413 let f = EventFactory::new().room(room_id).sender(user_id);
414
415 index_message(
416 &mut index,
417 f.text_msg("This is a sentence")
418 .event_id(event_id_1)
419 .into_any_sync_message_like_event(),
420 )?;
421
422 index_message(
423 &mut index,
424 f.text_msg("All new words").event_id(event_id_2).into_any_sync_message_like_event(),
425 )?;
426
427 index_message(
428 &mut index,
429 f.text_msg("A similar sentence")
430 .event_id(event_id_3)
431 .into_any_sync_message_like_event(),
432 )?;
433
434 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
435 let result: HashSet<_> = result.iter().map(|(_, id)| id).collect();
436
437 let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
438 let true_value: HashSet<_> = true_value.iter().collect();
439
440 assert_eq!(result, true_value, "search result not correct: {result:?}");
441
442 Ok(())
443 }
444
445 #[test]
446 fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
447 let room_id = room_id!("!room_id:localhost");
448 let index = RoomIndexBuilder::new_in_memory(room_id).build();
449
450 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
451
452 assert!(result.is_empty(), "search result not empty: {result:?}");
453
454 Ok(())
455 }
456
457 #[test]
458 fn test_index_contains_false() {
459 let room_id = room_id!("!room_id:localhost");
460 let index = RoomIndexBuilder::new_in_memory(room_id).build();
461
462 let event_id = event_id!("$event_id:localhost");
463
464 assert!(!index.contains(event_id), "Index should not contain event");
465 }
466
467 #[test]
468 fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
469 let room_id = room_id!("!room_id:localhost");
470 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
471
472 let event_id = event_id!("$event_id:localhost");
473 let event = EventFactory::new()
474 .text_msg("This is a sentence")
475 .event_id(event_id)
476 .room(room_id)
477 .sender(user_id!("@user_id:localhost"))
478 .into_any_sync_message_like_event();
479
480 index_message(&mut index, event)?;
481
482 assert!(index.contains(event_id), "Index should contain event");
483
484 Ok(())
485 }
486
487 #[test]
488 fn test_index_add_idempotency() -> Result<(), Box<dyn Error>> {
489 let room_id = room_id!("!room_id:localhost");
490 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
491
492 let event_id = event_id!("$event_id:localhost");
493 let event = EventFactory::new()
494 .text_msg("This is a sentence")
495 .event_id(event_id)
496 .room(room_id)
497 .sender(user_id!("@user_id:localhost"))
498 .into_any_sync_message_like_event();
499
500 index_message(&mut index, event.clone())?;
501
502 assert!(index.contains(event_id), "Index should contain event");
503
504 index_message(&mut index, event)?;
506
507 assert!(index.contains(event_id), "Index should still contain event");
508
509 let result = index.search("sentence", 10, None).expect("search failed with: {result:?}");
510
511 assert_eq!(result.len(), 1, "Index should have ignored second indexing");
512
513 Ok(())
514 }
515
516 #[test]
517 fn test_remove_event() -> Result<(), Box<dyn Error>> {
518 let room_id = room_id!("!room_id:localhost");
519 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
520
521 let event_id = event_id!("$event_id:localhost");
522 let user_id = user_id!("@user_id:localhost");
523 let f = EventFactory::new().room(room_id).sender(user_id);
524
525 let event =
526 f.text_msg("This is a sentence").event_id(event_id).into_any_sync_message_like_event();
527
528 index_message(&mut index, event)?;
529
530 assert!(index.contains(event_id), "Index should contain event");
531
532 index_remove(&mut index, event_id)?;
533
534 assert!(!index.contains(event_id), "Index should not contain event");
535
536 Ok(())
537 }
538
539 #[test]
540 fn test_edit_removes_old_and_adds_new_event() -> Result<(), Box<dyn Error>> {
541 let room_id = room_id!("!room_id:localhost");
542 let mut index = RoomIndexBuilder::new_in_memory(room_id).build();
543
544 let old_event_id = event_id!("$old_event_id:localhost");
545 let user_id = user_id!("@user_id:localhost");
546 let f = EventFactory::new().room(room_id).sender(user_id);
547
548 let old_event = f
549 .text_msg("This is a sentence")
550 .event_id(old_event_id)
551 .into_any_sync_message_like_event();
552
553 index_message(&mut index, old_event)?;
554
555 assert!(index.contains(old_event_id), "Index should contain event");
556
557 let new_event_id = event_id!("$new_event_id:localhost");
558 let edit = f
559 .text_msg("This is a brand new sentence!")
560 .edit(
561 old_event_id,
562 RoomMessageEventContentWithoutRelation::text_plain("This is a brand new sentence!"),
563 )
564 .event_id(new_event_id)
565 .into_original_sync_room_message_event();
566
567 index_edit(&mut index, old_event_id, edit)?;
568
569 assert!(!index.contains(old_event_id), "Index should not contain old event");
570 assert!(index.contains(new_event_id), "Index should contain edited event");
571
572 Ok(())
573 }
574}