Skip to main content

matrix_sdk/event_cache/caches/room/
subscriber.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The [`RoomEventCacheSubscriber`] implementation.
16
17use std::{
18    ops::{Deref, DerefMut},
19    sync::{
20        Arc,
21        atomic::{AtomicUsize, Ordering},
22    },
23};
24
25use ruma::OwnedRoomId;
26use tokio::sync::{broadcast::Receiver, mpsc};
27use tracing::{trace, warn};
28
29use super::{AutoShrinkChannelPayload, RoomEventCacheUpdate};
30
31/// Thin wrapper for a room event cache subscriber, so as to trigger
32/// side-effects when all subscribers are gone.
33///
34/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
35/// more subscribers are active. This is an optimisation to reduce the number of
36/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
37/// active, all data are reduced to the minimum.
38///
39/// The side-effect takes effect on `Drop`.
40///
41/// [`RoomEventCache`]: super::RoomEventCache
42#[allow(missing_debug_implementations)]
43pub struct RoomEventCacheSubscriber {
44    /// Underlying receiver of the room event cache's updates.
45    recv: Receiver<RoomEventCacheUpdate>,
46
47    /// To which room are we listening?
48    room_id: OwnedRoomId,
49
50    /// Sender to the auto-shrink channel.
51    auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
52
53    /// Shared instance of the auto-shrinker.
54    subscriber_count: Arc<AtomicUsize>,
55}
56
57impl RoomEventCacheSubscriber {
58    /// Create a new [`RoomEventCacheSubscriber`].
59    pub(super) fn new(
60        recv: Receiver<RoomEventCacheUpdate>,
61        room_id: OwnedRoomId,
62        auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
63        subscriber_count: Arc<AtomicUsize>,
64    ) -> Self {
65        Self { recv, room_id, auto_shrink_sender, subscriber_count }
66    }
67}
68
69impl Drop for RoomEventCacheSubscriber {
70    fn drop(&mut self) {
71        let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
72
73        trace!(
74            "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
75        );
76
77        if previous_subscriber_count == 1 {
78            // We were the last instance of the subscriber; let the auto-shrinker know by
79            // notifying it of our room id.
80
81            let mut room_id = self.room_id.clone();
82
83            // Try to send without waiting for channel capacity, and restart in a spin-loop
84            // if it failed (until a maximum number of attempts is reached, or
85            // the send was successful). The channel shouldn't be super busy in
86            // general, so this should resolve quickly enough.
87
88            let mut num_attempts = 0;
89
90            while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
91                num_attempts += 1;
92
93                if num_attempts > 1024 {
94                    // If we've tried too many times, just give up with a warning; after all, this
95                    // is only an optimization.
96                    warn!(
97                        "couldn't send notification to the auto-shrink channel \
98                         after 1024 attempts; giving up"
99                    );
100                    return;
101                }
102
103                match err {
104                    mpsc::error::TrySendError::Full(stolen_room_id) => {
105                        room_id = stolen_room_id;
106                    }
107                    mpsc::error::TrySendError::Closed(_) => return,
108                }
109            }
110
111            trace!("sent notification to the parent channel that we were the last subscriber");
112        }
113    }
114}
115
116impl Deref for RoomEventCacheSubscriber {
117    type Target = Receiver<RoomEventCacheUpdate>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.recv
121    }
122}
123
124impl DerefMut for RoomEventCacheSubscriber {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.recv
127    }
128}