1use std::{fmt, fs, path::Path, sync::Arc};
16
17use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId, events::AnySyncMessageLikeEvent};
18use tantivy::{
19 Index, IndexReader, TantivyDocument,
20 collector::TopDocs,
21 directory::{MmapDirectory, error::OpenDirectoryError},
22 query::QueryParser,
23 schema::Value,
24};
25use tracing::{error, warn};
26
27use crate::{
28 OpStamp, TANTIVY_INDEX_MEMORY_BUDGET,
29 error::IndexError,
30 schema::{MatrixSearchIndexSchema, RoomMessageSchema},
31 writer::SearchIndexWriter,
32};
33
34pub(crate) enum RoomIndexOperation {
36 Add(TantivyDocument),
37}
38
39pub struct RoomIndex {
42 schema: RoomMessageSchema,
43 writer: SearchIndexWriter,
44 reader: IndexReader,
45 query_parser: QueryParser,
46 room_id: OwnedRoomId,
47}
48
49impl fmt::Debug for RoomIndex {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("RoomIndex")
52 .field("schema", &self.schema)
53 .field("room_id", &self.room_id)
54 .finish()
55 }
56}
57
58impl RoomIndex {
59 fn new_with(
60 index: Index,
61 schema: RoomMessageSchema,
62 room_id: &RoomId,
63 ) -> Result<RoomIndex, IndexError> {
64 let writer = index.writer(TANTIVY_INDEX_MEMORY_BUDGET)?;
65 let reader = index.reader_builder().try_into()?;
66
67 let query_parser = QueryParser::for_index(&index, schema.default_search_fields());
68 Ok(Self {
69 schema,
70 writer: writer.into(),
71 reader,
72 query_parser,
73 room_id: room_id.to_owned(),
74 })
75 }
76
77 pub fn new(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
79 let path = path.join(room_id.as_str());
80 let schema = RoomMessageSchema::new();
81 fs::create_dir_all(path.clone())?;
82 let index = Index::create_in_dir(path, schema.as_tantivy_schema())?;
83 RoomIndex::new_with(index, schema, room_id)
84 }
85
86 pub fn new_in_memory(room_id: &RoomId) -> Result<RoomIndex, IndexError> {
89 let schema = RoomMessageSchema::new();
90 let index = Index::create_in_ram(schema.as_tantivy_schema());
91 RoomIndex::new_with(index, schema, room_id)
92 }
93
94 pub fn open_or_create(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
97 let path = path.join(room_id.as_str());
98 let mmap_dir = match MmapDirectory::open(path) {
99 Ok(dir) => Ok(dir),
100 Err(err) => match err {
101 OpenDirectoryError::DoesNotExist(path) => {
102 fs::create_dir_all(path.clone()).map_err(|err| {
103 OpenDirectoryError::IoError {
104 io_error: Arc::new(err),
105 directory_path: path.to_path_buf(),
106 }
107 })?;
108 MmapDirectory::open(path)
109 }
110 _ => Err(err),
111 },
112 }?;
113 let schema = RoomMessageSchema::new();
114 let index = Index::open_or_create(mmap_dir, schema.as_tantivy_schema())?;
115 RoomIndex::new_with(index, schema, room_id)
116 }
117
118 pub fn open(path: &Path, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
120 let path = path.join(room_id.as_str());
121 let index_path = MmapDirectory::open(path)?;
122 let index = Index::open(index_path)?;
123 let schema: RoomMessageSchema = index.schema().try_into()?;
124 RoomIndex::new_with(index, schema, room_id)
125 }
126
127 pub fn handle_event(&mut self, event: AnySyncMessageLikeEvent) -> Result<(), IndexError> {
132 let event_id = event.event_id().to_owned();
133
134 match self.schema.handle_event(event)? {
135 RoomIndexOperation::Add(document) => {
136 if !self.contains(&event_id) {
137 self.writer.add_document(document)?;
138 }
139 }
140 }
141 Ok(())
142 }
143
144 pub fn commit(&mut self) -> Result<OpStamp, IndexError> {
146 let last_commit_opstamp = self.writer.commit()?; Ok(last_commit_opstamp)
148 }
149
150 pub fn commit_and_reload(&mut self) -> Result<OpStamp, IndexError> {
161 let last_commit_opstamp = self.writer.commit()?; self.reader.reload()?;
163 Ok(last_commit_opstamp)
164 }
165
166 pub fn search(
169 &self,
170 query: &str,
171 max_number_of_results: usize,
172 ) -> Result<Vec<OwnedEventId>, IndexError> {
173 let query = self.query_parser.parse_query(query)?;
174 let searcher = self.reader.searcher();
175
176 let results = searcher.search(&query, &TopDocs::with_limit(max_number_of_results))?;
177 let mut ret: Vec<OwnedEventId> = Vec::new();
178 let pk = self.schema.primary_key();
179
180 for (_score, doc_address) in results {
181 let retrieved_doc: TantivyDocument = searcher.doc(doc_address)?;
182 match retrieved_doc.get_first(pk).and_then(|maybe_value| maybe_value.as_str()) {
183 Some(value) => match OwnedEventId::try_from(value) {
184 Ok(event_id) => ret.push(event_id),
185 Err(err) => error!("error while parsing event_id from search result: {err:?}"),
186 },
187 _ => error!("unexpected value type while searching documents"),
188 }
189 }
190
191 Ok(ret)
192 }
193
194 fn contains(&self, event_id: &EventId) -> bool {
195 let search_result = self.search(format!("event_id:\"{event_id}\"").as_str(), 1);
196 match search_result {
197 Ok(results) => !results.is_empty(),
198 Err(err) => {
199 warn!("Failed to check if event has been indexed, assuming it has: {err}");
200 true
201 }
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::{collections::HashSet, error::Error};
209
210 use matrix_sdk_test::event_factory::EventFactory;
211 use ruma::{event_id, room_id, user_id};
212
213 use crate::index::RoomIndex;
214
215 #[test]
216 fn test_make_index_in_memory() {
217 let room_id = room_id!("!room_id:localhost");
218 let index = RoomIndex::new_in_memory(room_id);
219
220 index.expect("failed to make index in ram: {index:?}");
221 }
222
223 #[test]
224 fn test_handle_event() {
225 let room_id = room_id!("!room_id:localhost");
226 let mut index =
227 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
228
229 let event = EventFactory::new()
230 .text_msg("event message")
231 .event_id(event_id!("$event_id:localhost"))
232 .room(room_id)
233 .sender(user_id!("@user_id:localhost"))
234 .into_any_sync_message_like_event();
235
236 index.handle_event(event).expect("failed to add event: {res:?}");
237 }
238
239 #[test]
240 fn test_search_populated_index() -> Result<(), Box<dyn Error>> {
241 let room_id = room_id!("!room_id:localhost");
242 let mut index =
243 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
244
245 let event_id_1 = event_id!("$event_id_1:localhost");
246 let event_id_2 = event_id!("$event_id_2:localhost");
247 let event_id_3 = event_id!("$event_id_3:localhost");
248
249 index.handle_event(
250 EventFactory::new()
251 .text_msg("This is a sentence")
252 .event_id(event_id_1)
253 .room(room_id)
254 .sender(user_id!("@user_id:localhost"))
255 .into_any_sync_message_like_event(),
256 )?;
257
258 index.handle_event(
259 EventFactory::new()
260 .text_msg("All new words")
261 .event_id(event_id_2)
262 .room(room_id)
263 .sender(user_id!("@user_id:localhost"))
264 .into_any_sync_message_like_event(),
265 )?;
266
267 index.handle_event(
268 EventFactory::new()
269 .text_msg("A similar sentence")
270 .event_id(event_id_3)
271 .room(room_id)
272 .sender(user_id!("@user_id:localhost"))
273 .into_any_sync_message_like_event(),
274 )?;
275
276 index.commit_and_reload()?;
277
278 let result = index.search("sentence", 10).expect("search failed with: {result:?}");
279 let result: HashSet<_> = result.iter().collect();
280
281 let true_value = [event_id_1.to_owned(), event_id_3.to_owned()];
282 let true_value: HashSet<_> = true_value.iter().collect();
283
284 assert_eq!(result, true_value, "search result not correct: {result:?}");
285
286 Ok(())
287 }
288
289 #[test]
290 fn test_search_empty_index() -> Result<(), Box<dyn Error>> {
291 let room_id = room_id!("!room_id:localhost");
292 let mut index =
293 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
294
295 index.commit_and_reload()?;
296
297 let result = index.search("sentence", 10).expect("search failed with: {result:?}");
298
299 assert!(result.is_empty(), "search result not empty: {result:?}");
300
301 Ok(())
302 }
303
304 #[test]
305 fn test_index_contains_false() -> Result<(), Box<dyn Error>> {
306 let room_id = room_id!("!room_id:localhost");
307 let mut index =
308 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
309
310 let event_id = event_id!("$event_id:localhost");
311
312 index.commit_and_reload()?;
313
314 assert!(!index.contains(event_id), "Index should not contain event");
315
316 Ok(())
317 }
318
319 #[test]
320 fn test_index_contains_true() -> Result<(), Box<dyn Error>> {
321 let room_id = room_id!("!room_id:localhost");
322 let mut index =
323 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
324
325 let event_id = event_id!("$event_id:localhost");
326 let event = EventFactory::new()
327 .text_msg("This is a sentence")
328 .event_id(event_id)
329 .room(room_id)
330 .sender(user_id!("@user_id:localhost"))
331 .into_any_sync_message_like_event();
332
333 index.handle_event(event)?;
334
335 index.commit_and_reload()?;
336
337 assert!(index.contains(event_id), "Index should contain event");
338
339 Ok(())
340 }
341
342 #[test]
343 fn test_indexing_idempotency() -> Result<(), Box<dyn Error>> {
344 let room_id = room_id!("!room_id:localhost");
345 let mut index =
346 RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}");
347
348 let event_id = event_id!("$event_id:localhost");
349 let event = EventFactory::new()
350 .text_msg("This is a sentence")
351 .event_id(event_id)
352 .room(room_id)
353 .sender(user_id!("@user_id:localhost"))
354 .into_any_sync_message_like_event();
355
356 index.handle_event(event.clone())?;
357
358 index.commit_and_reload()?;
359
360 assert!(index.contains(event_id), "Index should contain event");
361
362 index.handle_event(event)?;
364
365 index.commit_and_reload()?;
366
367 assert!(index.contains(event_id), "Index should still contain event");
368
369 let result = index.search("sentence", 10).expect("search failed with: {result:?}");
370
371 assert_eq!(result.len(), 1, "Index should have ignored second indexing");
372
373 Ok(())
374 }
375}