matrix_sdk/event_cache/room/
threads.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Threads-related data structures.
16
17use std::collections::BTreeSet;
18
19use eyeball_im::VectorDiff;
20use matrix_sdk_base::{
21    event_cache::{Event, Gap},
22    linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, OwnedRoomId};
25use tokio::sync::broadcast::{Receiver, Sender};
26use tracing::trace;
27
28use crate::event_cache::{
29    deduplicator::DeduplicationOutcome,
30    room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
31    BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
32};
33
34/// An update coming from a thread event cache.
35#[derive(Clone, Debug)]
36pub struct ThreadEventCacheUpdate {
37    /// New vector diff for the thread timeline.
38    pub diffs: Vec<VectorDiff<Event>>,
39    /// The origin that triggered this update.
40    pub origin: EventsOrigin,
41}
42
43/// All the information related to a single thread.
44pub(crate) struct ThreadEventCache {
45    /// The room owning this thread.
46    room_id: OwnedRoomId,
47
48    /// The ID of the thread root event, which is the first event in the thread
49    /// (and eventually the first in the linked chunk).
50    thread_root: OwnedEventId,
51
52    /// The linked chunk for this thread.
53    chunk: EventLinkedChunk,
54
55    /// A sender for live events updates in this thread.
56    sender: Sender<ThreadEventCacheUpdate>,
57
58    /// A sender for the globally observable linked chunk updates that happened
59    /// during a sync or a back-pagination.
60    ///
61    /// See also [`super::super::EventCacheInner::linked_chunk_update_sender`].
62    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
63}
64
65impl ThreadEventCache {
66    /// Create a new empty thread event cache.
67    pub fn new(
68        room_id: OwnedRoomId,
69        thread_root: OwnedEventId,
70        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
71    ) -> Self {
72        Self {
73            chunk: EventLinkedChunk::new(),
74            sender: Sender::new(32),
75            room_id,
76            thread_root,
77            linked_chunk_update_sender,
78        }
79    }
80
81    /// Subscribe to live events from this thread.
82    pub fn subscribe(&self) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
83        let events = self.chunk.events().map(|(_position, item)| item.clone()).collect();
84
85        let recv = self.sender.subscribe();
86
87        (events, recv)
88    }
89
90    /// Clear a thread, after a gappy sync for instance.
91    pub fn clear(&mut self) {
92        self.chunk.reset();
93
94        let diffs = self.chunk.updates_as_vector_diffs();
95        if !diffs.is_empty() {
96            let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Cache });
97        }
98    }
99
100    // TODO(bnjbvr): share more code with `RoomEventCacheState` to avoid the
101    // duplication here too.
102    fn propagate_changes(&mut self) {
103        // This is a lie, at the moment! We're not persisting threads yet, so we're just
104        // forwarding all updates to the linked chunk update sender.
105        let updates = self.chunk.store_updates().take();
106
107        let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
108            updates,
109            linked_chunk_id: OwnedLinkedChunkId::Thread(
110                self.room_id.clone(),
111                self.thread_root.clone(),
112            ),
113        });
114    }
115
116    /// Push some live events to this thread, and propagate the updates to
117    /// the listeners.
118    pub fn add_live_events(&mut self, events: Vec<Event>) {
119        if events.is_empty() {
120            return;
121        }
122
123        let deduplication = self.filter_duplicate_events(events);
124
125        if deduplication.non_empty_all_duplicates {
126            // If all events are duplicates, we don't need to do anything; ignore
127            // the new events.
128            return;
129        }
130
131        // Remove the duplicated events from the thread chunk.
132        self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
133        assert!(
134            deduplication.in_store_duplicated_event_ids.is_empty(),
135            "persistent storage for threads is not implemented yet"
136        );
137
138        let events = deduplication.all_events;
139
140        self.chunk.push_live_events(None, &events);
141
142        self.propagate_changes();
143
144        let diffs = self.chunk.updates_as_vector_diffs();
145        if !diffs.is_empty() {
146            let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
147        }
148    }
149
150    /// Simplified version of
151    /// [`RoomEventCacheState::load_more_events_backwards`], which
152    /// returns the outcome of the pagination without actually loading from
153    /// disk.
154    pub fn load_more_events_backwards(&self) -> LoadMoreEventsBackwardsOutcome {
155        // If any in-memory chunk is a gap, don't load more events, and let the caller
156        // resolve the gap.
157        if let Some(prev_token) = self.chunk.rgap().map(|gap| gap.prev_token) {
158            trace!(%prev_token, "thread chunk has at least a gap");
159            return LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) };
160        }
161
162        // If we don't have a gap, then the first event should be the the thread's root;
163        // otherwise, we'll restart a pagination from the end.
164        if let Some((_pos, event)) = self.chunk.events().next() {
165            let first_event_id =
166                event.event_id().expect("a linked chunk only stores events with IDs");
167
168            if first_event_id == self.thread_root {
169                trace!("thread chunk is fully loaded and non-empty: reached_start=true");
170                return LoadMoreEventsBackwardsOutcome::StartOfTimeline;
171            }
172        }
173
174        // Otherwise, we don't have a gap nor events. We don't have anything. Poor us.
175        // Well, is ok: start a pagination from the end.
176        LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
177    }
178
179    /// Find duplicates in a thread, until there's persistent storage for
180    /// those.
181    ///
182    /// TODO: when persistent storage is implemented for thread, only use
183    /// the regular `filter_duplicate_events` method.
184    fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
185        let mut new_event_ids = BTreeSet::new();
186
187        new_events.retain(|event| {
188            // Only keep events with IDs, and those for which `insert` returns `true`
189            // (meaning they were not in the set).
190            event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
191        });
192
193        let in_memory_duplicated_event_ids: Vec<_> = self
194            .chunk
195            .events()
196            .filter_map(|(position, event)| {
197                let event_id = event.event_id()?;
198                new_event_ids.contains(&event_id).then_some((event_id, position))
199            })
200            .collect();
201
202        // Right now, there's no persistent storage for threads.
203        let in_store_duplicated_event_ids = Vec::new();
204
205        let at_least_one_event = !new_events.is_empty();
206        let all_duplicates = (in_memory_duplicated_event_ids.len()
207            + in_store_duplicated_event_ids.len())
208            == new_events.len();
209        let non_empty_all_duplicates = at_least_one_event && all_duplicates;
210
211        DeduplicationOutcome {
212            all_events: new_events,
213            in_memory_duplicated_event_ids,
214            in_store_duplicated_event_ids,
215            non_empty_all_duplicates,
216        }
217    }
218
219    /// Remove in-memory duplicated events from the thread chunk, that have
220    /// been found with [`Self::filter_duplicate_events`].
221    ///
222    /// Precondition: the `in_memory_duplicated_event_ids` must be the
223    /// result of the above function, otherwise this can panic.
224    fn remove_in_memory_duplicated_events(
225        &mut self,
226        in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
227    ) {
228        // Remove the duplicated events from the thread chunk.
229        self.chunk
230            .remove_events_by_position(
231                in_memory_duplicated_event_ids
232                    .iter()
233                    .map(|(_event_id, position)| *position)
234                    .collect(),
235            )
236            .expect("we collected the position of the events to remove just before");
237    }
238
239    /// Finish a network pagination started with the gap retrieved from
240    /// [`Self::load_more_events_backwards`].
241    ///
242    /// Returns `None` if the gap couldn't be found anymore (meaning the
243    /// thread has been reset while the pagination was ongoing).
244    pub fn finish_network_pagination(
245        &mut self,
246        prev_token: Option<String>,
247        new_token: Option<String>,
248        events: Vec<Event>,
249    ) -> Option<BackPaginationOutcome> {
250        // TODO(bnjbvr): consider deduplicating this code (~same for room) at some
251        // point.
252        let prev_gap_id = if let Some(token) = prev_token {
253            // If the gap id is missing, it means that the gap disappeared during
254            // pagination; in this case, early return to the caller.
255            let gap_id = self.chunk.chunk_identifier(|chunk| {
256                    matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
257                })?;
258
259            Some(gap_id)
260        } else {
261            None
262        };
263
264        // This is a backwards pagination, so the events were returned in the reverse
265        // topological order.
266        let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
267        let new_gap = new_token.map(|token| Gap { prev_token: token });
268
269        let deduplication = self.filter_duplicate_events(topo_ordered_events);
270
271        let (events, new_gap) = if deduplication.non_empty_all_duplicates {
272            // If all events are duplicates, we don't need to do anything; ignore
273            // the new events and the new gap.
274            (Vec::new(), None)
275        } else {
276            assert!(
277                deduplication.in_store_duplicated_event_ids.is_empty(),
278                "persistent storage for threads is not implemented yet"
279            );
280            self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
281
282            // Keep events and the gap.
283            (deduplication.all_events, new_gap)
284        };
285
286        // Add the paginated events to the thread chunk.
287        let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
288
289        self.propagate_changes();
290
291        // Notify observers about the updates.
292        let updates = self.chunk.updates_as_vector_diffs();
293        if !updates.is_empty() {
294            // Send the updates to the listeners.
295            let _ = self
296                .sender
297                .send(ThreadEventCacheUpdate { diffs: updates, origin: EventsOrigin::Pagination });
298        }
299
300        Some(BackPaginationOutcome { reached_start, events })
301    }
302
303    /// Returns the latest event ID in this thread, if any.
304    pub fn latest_event_id(&self) -> Option<OwnedEventId> {
305        self.chunk.revents().next().and_then(|(_position, event)| event.event_id())
306    }
307}