1use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc};
20
21use futures_util::future::join_all;
22use matrix_sdk_base::deserialized_responses::TimelineEvent;
23use matrix_sdk_search::{
24 error::IndexError,
25 index::{RoomIndex, RoomIndexOperation, builder::RoomIndexBuilder},
26};
27use ruma::{
28 EventId, OwnedEventId, OwnedRoomId, RoomId,
29 events::{
30 AnySyncMessageLikeEvent, AnySyncTimelineEvent,
31 room::{
32 message::{OriginalSyncRoomMessageEvent, Relation, SyncRoomMessageEvent},
33 redaction::SyncRoomRedactionEvent,
34 },
35 },
36 room_version_rules::RedactionRules,
37};
38use tokio::sync::{Mutex, MutexGuard};
39use tracing::{debug, warn};
40
41use crate::event_cache::RoomEventCache;
42
43type Password = String;
44
45#[derive(Clone, Debug)]
47pub enum SearchIndexStoreKind {
48 UnencryptedDirectory(PathBuf),
50 EncryptedDirectory(PathBuf, Password),
52 InMemory,
54}
55
56#[derive(Clone, Debug)]
58pub struct SearchIndex {
59 room_indexes: Arc<Mutex<HashMap<OwnedRoomId, RoomIndex>>>,
61
62 search_index_store_kind: SearchIndexStoreKind,
64}
65
66impl SearchIndex {
67 pub fn new(
69 room_indexes: Arc<Mutex<HashMap<OwnedRoomId, RoomIndex>>>,
70 search_index_store_kind: SearchIndexStoreKind,
71 ) -> Self {
72 Self { room_indexes, search_index_store_kind }
73 }
74
75 pub async fn lock(&self) -> SearchIndexGuard<'_> {
77 SearchIndexGuard {
78 index_map: self.room_indexes.lock().await,
79 search_index_store_kind: &self.search_index_store_kind,
80 }
81 }
82}
83
84#[derive(Debug)]
86pub struct SearchIndexGuard<'a> {
87 index_map: MutexGuard<'a, HashMap<OwnedRoomId, RoomIndex>>,
89
90 search_index_store_kind: &'a SearchIndexStoreKind,
92}
93
94impl SearchIndexGuard<'_> {
95 fn create_index(&self, room_id: &RoomId) -> Result<RoomIndex, IndexError> {
96 let index = match self.search_index_store_kind {
97 SearchIndexStoreKind::UnencryptedDirectory(path) => {
98 RoomIndexBuilder::new_on_disk(path.to_path_buf(), room_id).unencrypted().build()?
99 }
100 SearchIndexStoreKind::EncryptedDirectory(path, password) => {
101 RoomIndexBuilder::new_on_disk(path.to_path_buf(), room_id)
102 .encrypted(password)
103 .build()?
104 }
105 SearchIndexStoreKind::InMemory => RoomIndexBuilder::new_in_memory(room_id).build(),
106 };
107 Ok(index)
108 }
109
110 pub(crate) fn execute(
118 &mut self,
119 operation: RoomIndexOperation,
120 room_id: &RoomId,
121 ) -> Result<(), IndexError> {
122 if !self.index_map.contains_key(room_id) {
123 let index = self.create_index(room_id)?;
124 self.index_map.insert(room_id.to_owned(), index);
125 }
126
127 let index = self.index_map.get_mut(room_id).expect("index should exist");
128
129 index.execute(operation)
130 }
131
132 pub(crate) fn bulk_execute(
138 &mut self,
139 operations: Vec<RoomIndexOperation>,
140 room_id: &RoomId,
141 ) -> Result<(), IndexError> {
142 if !self.index_map.contains_key(room_id) {
143 let index = self.create_index(room_id)?;
144 self.index_map.insert(room_id.to_owned(), index);
145 }
146
147 let index = self.index_map.get_mut(room_id).expect("index should exist");
148
149 index.bulk_execute(operations)
150 }
151
152 pub(crate) fn search(
155 &mut self,
156 query: &str,
157 max_number_of_results: usize,
158 pagination_offset: Option<usize>,
159 room_id: &RoomId,
160 ) -> Result<Vec<OwnedEventId>, IndexError> {
161 if !self.index_map.contains_key(room_id) {
162 let index = self.create_index(room_id)?;
163 self.index_map.insert(room_id.to_owned(), index);
164 }
165
166 let index = self.index_map.get_mut(room_id).expect("index should exist");
167
168 index.search(query, max_number_of_results, pagination_offset)
169 }
170
171 pub async fn handle_timeline_event(
178 &mut self,
179 event: TimelineEvent,
180 room_cache: &RoomEventCache,
181 room_id: &RoomId,
182 redaction_rules: &RedactionRules,
183 ) -> Result<(), IndexError> {
184 if let Some(index_operation) =
185 parse_timeline_event(room_cache, event, redaction_rules).await
186 {
187 self.execute(index_operation, room_id)
188 } else {
189 Ok(())
190 }
191 }
192
193 pub async fn bulk_handle_timeline_event<T>(
196 &mut self,
197 events: T,
198 room_cache: &RoomEventCache,
199 room_id: &RoomId,
200 redaction_rules: &RedactionRules,
201 ) -> Result<(), IndexError>
202 where
203 T: Iterator<Item = TimelineEvent>,
204 {
205 let futures = events.map(|ev| parse_timeline_event(room_cache, ev, redaction_rules));
206
207 let operations: Vec<_> = join_all(futures).await.into_iter().flatten().collect();
208
209 self.bulk_execute(operations, room_id)
210 }
211}
212
213async fn get_most_recent_edit(
216 cache: &RoomEventCache,
217 original: &EventId,
218) -> Option<OriginalSyncRoomMessageEvent> {
219 use ruma::events::{AnySyncTimelineEvent, relation::RelationType};
220
221 let Some((original_ev, related)) =
222 cache.find_event_with_relations(original, Some(vec![RelationType::Replacement])).await
223 else {
224 debug!("Couldn't find relations for {}", original);
225 return None;
226 };
227
228 match related.last().unwrap_or(&original_ev).raw().deserialize() {
229 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(latest))) => {
230 latest.as_original().cloned()
231 }
232 _ => None,
233 }
234}
235
236async fn handle_possible_edit(
240 event: &OriginalSyncRoomMessageEvent,
241 cache: &RoomEventCache,
242) -> Option<RoomIndexOperation> {
243 if let Some(Relation::Replacement(replacement_data)) = &event.content.relates_to {
244 if let Some(recent) = get_most_recent_edit(cache, &replacement_data.event_id).await {
245 return Some(RoomIndexOperation::Edit(replacement_data.event_id.clone(), recent));
246 } else {
247 return Some(RoomIndexOperation::Noop);
248 }
249 }
250 None
251}
252
253async fn handle_room_message(
256 event: SyncRoomMessageEvent,
257 cache: &RoomEventCache,
258) -> Option<RoomIndexOperation> {
259 if let Some(event) = event.as_original() {
260 return handle_possible_edit(event, cache).await.or(get_most_recent_edit(
261 cache,
262 &event.event_id,
263 )
264 .await
265 .map(RoomIndexOperation::Add));
266 }
267 None
268}
269
270async fn handle_room_redaction(
273 event: SyncRoomRedactionEvent,
274 cache: &RoomEventCache,
275 rules: &RedactionRules,
276) -> Option<RoomIndexOperation> {
277 if let Some(redacted_event_id) = event.redacts(rules)
278 && let Some(redacted_event) = cache.find_event(redacted_event_id).await
279 && let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
280 redacted_event,
281 ))) = redacted_event.raw().deserialize()
282 && let Some(redacted_event) = redacted_event.as_original()
283 {
284 return handle_possible_edit(redacted_event, cache)
285 .await
286 .or(Some(RoomIndexOperation::Remove(redacted_event.event_id.clone())));
287 }
288 None
289}
290
291async fn parse_timeline_event(
294 cache: &RoomEventCache,
295 event: TimelineEvent,
296 redaction_rules: &RedactionRules,
297) -> Option<RoomIndexOperation> {
298 use ruma::events::AnySyncTimelineEvent;
299
300 if event.kind.is_utd() {
301 return None;
302 }
303
304 match event.raw().deserialize() {
305 Ok(event) => match event {
306 AnySyncTimelineEvent::MessageLike(event) => match event {
307 AnySyncMessageLikeEvent::RoomMessage(event) => {
308 handle_room_message(event, cache).await
309 }
310 AnySyncMessageLikeEvent::RoomRedaction(event) => {
311 handle_room_redaction(event, cache, redaction_rules).await
312 }
313 _ => None,
314 },
315 AnySyncTimelineEvent::State(_) => None,
316 },
317
318 Err(e) => {
319 warn!("failed to parse event: {e:?}");
320 None
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
328 use ruma::{
329 event_id, events::room::message::RoomMessageEventContentWithoutRelation, room_id, user_id,
330 };
331
332 use crate::test_utils::mocks::MatrixMockServer;
333
334 #[cfg(feature = "experimental-search")]
335 #[async_test]
336 async fn test_sync_message_is_indexed() {
337 let mock_server = MatrixMockServer::new().await;
338 let client = mock_server.client_builder().build().await;
339
340 client.event_cache().subscribe().unwrap();
341
342 let room_id = room_id!("!room_id:localhost");
343 let event_id = event_id!("$event_id:localost");
344 let user_id = user_id!("@user_id:localost");
345
346 let event_factory = EventFactory::new();
347 let room = mock_server
348 .sync_room(
349 &client,
350 JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![
351 event_factory
352 .text_msg("this is a sentence")
353 .event_id(event_id)
354 .sender(user_id)
355 .into_raw_sync(),
356 ]),
357 )
358 .await;
359
360 let response = room.search("this", 5, None).await.expect("search should have 1 result");
361
362 assert_eq!(response.len(), 1, "unexpected numbers of responses: {response:?}");
363 assert_eq!(response[0], event_id, "event id doesn't match: {response:?}");
364 }
365
366 #[cfg(feature = "experimental-search")]
367 #[async_test]
368 async fn test_search_index_edit_ordering() {
369 let room_id = room_id!("!room_id:localhost");
370 let dummy_id = event_id!("$dummy");
371 let edit1_id = event_id!("$edit1");
372 let edit2_id = event_id!("$edit2");
373 let edit3_id = event_id!("$edit3");
374 let original_id = event_id!("$original");
375
376 let server = MatrixMockServer::new().await;
377 let client = server.client_builder().build().await;
378
379 let event_cache = client.event_cache();
380 event_cache.subscribe().unwrap();
381
382 let room = server.sync_joined_room(&client, room_id).await;
383
384 let f = EventFactory::new().room(room_id).sender(user_id!("@user_id:localhost"));
385
386 let dummy = f.text_msg("dummy").event_id(dummy_id).into_raw();
388
389 let original = f.text_msg("This is a message").event_id(original_id).into_raw();
390
391 let edit1 = f
392 .text_msg("* A new message")
393 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("A new message"))
394 .event_id(edit1_id)
395 .into_raw();
396
397 let edit2 = f
398 .text_msg("* An even newer message")
399 .edit(
400 original_id,
401 RoomMessageEventContentWithoutRelation::text_plain("An even newer message"),
402 )
403 .event_id(edit2_id)
404 .into_raw();
405
406 let edit3 = f
407 .text_msg("* The newest message")
408 .edit(
409 original_id,
410 RoomMessageEventContentWithoutRelation::text_plain("The newest message"),
411 )
412 .event_id(edit3_id)
413 .into_raw();
414
415 server
416 .sync_room(
417 &client,
418 JoinedRoomBuilder::new(room_id)
419 .add_timeline_event(dummy.clone())
420 .add_timeline_event(edit1.clone())
421 .add_timeline_event(edit2.clone()),
422 )
423 .await;
424
425 let results = room.search("message", 3, None).await.unwrap();
426
427 assert_eq!(results.len(), 0, "Search should return 0 results, got {results:?}");
428
429 server
432 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(original))
433 .await;
434
435 let results = room.search("message", 3, None).await.unwrap();
436
437 assert_eq!(results.len(), 1, "Search should return 1 result, got {results:?}");
438 assert_eq!(results[0], edit2_id, "Search should return latest edit, got {:?}", results[0]);
439
440 server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(edit3)).await;
443
444 let results = room.search("message", 3, None).await.unwrap();
445
446 assert_eq!(results.len(), 1, "Search should return 1 result, got {results:?}");
447 assert_eq!(results[0], edit3_id, "Search should return latest edit, got {:?}", results[0]);
448 }
449}