matrix_sdk/event_cache/caches/room/subscriber.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The [`RoomEventCacheSubscriber`] implementation.
16
17use std::{
18 ops::{Deref, DerefMut},
19 sync::{
20 Arc,
21 atomic::{AtomicUsize, Ordering},
22 },
23};
24
25use ruma::OwnedRoomId;
26use tokio::sync::{broadcast::Receiver, mpsc};
27use tracing::{trace, warn};
28
29use super::{AutoShrinkChannelPayload, RoomEventCacheUpdate};
30
31/// Thin wrapper for a room event cache subscriber, so as to trigger
32/// side-effects when all subscribers are gone.
33///
34/// The current side-effect is: auto-shrinking the [`RoomEventCache`] when no
35/// more subscribers are active. This is an optimisation to reduce the number of
36/// data held in memory by a [`RoomEventCache`]: when no more subscribers are
37/// active, all data are reduced to the minimum.
38///
39/// The side-effect takes effect on `Drop`.
40///
41/// [`RoomEventCache`]: super::RoomEventCache
42#[allow(missing_debug_implementations)]
43pub struct RoomEventCacheSubscriber {
44 /// Underlying receiver of the room event cache's updates.
45 recv: Receiver<RoomEventCacheUpdate>,
46
47 /// To which room are we listening?
48 room_id: OwnedRoomId,
49
50 /// Sender to the auto-shrink channel.
51 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
52
53 /// Shared instance of the auto-shrinker.
54 subscriber_count: Arc<AtomicUsize>,
55}
56
57impl RoomEventCacheSubscriber {
58 /// Create a new [`RoomEventCacheSubscriber`].
59 pub(super) fn new(
60 recv: Receiver<RoomEventCacheUpdate>,
61 room_id: OwnedRoomId,
62 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
63 subscriber_count: Arc<AtomicUsize>,
64 ) -> Self {
65 Self { recv, room_id, auto_shrink_sender, subscriber_count }
66 }
67}
68
69impl Drop for RoomEventCacheSubscriber {
70 fn drop(&mut self) {
71 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
72
73 trace!(
74 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
75 );
76
77 if previous_subscriber_count == 1 {
78 // We were the last instance of the subscriber; let the auto-shrinker know by
79 // notifying it of our room id.
80
81 let mut room_id = self.room_id.clone();
82
83 // Try to send without waiting for channel capacity, and restart in a spin-loop
84 // if it failed (until a maximum number of attempts is reached, or
85 // the send was successful). The channel shouldn't be super busy in
86 // general, so this should resolve quickly enough.
87
88 let mut num_attempts = 0;
89
90 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
91 num_attempts += 1;
92
93 if num_attempts > 1024 {
94 // If we've tried too many times, just give up with a warning; after all, this
95 // is only an optimization.
96 warn!(
97 "couldn't send notification to the auto-shrink channel \
98 after 1024 attempts; giving up"
99 );
100 return;
101 }
102
103 match err {
104 mpsc::error::TrySendError::Full(stolen_room_id) => {
105 room_id = stolen_room_id;
106 }
107 mpsc::error::TrySendError::Closed(_) => return,
108 }
109 }
110
111 trace!("sent notification to the parent channel that we were the last subscriber");
112 }
113 }
114}
115
116impl Deref for RoomEventCacheSubscriber {
117 type Target = Receiver<RoomEventCacheUpdate>;
118
119 fn deref(&self) -> &Self::Target {
120 &self.recv
121 }
122}
123
124impl DerefMut for RoomEventCacheSubscriber {
125 fn deref_mut(&mut self) -> &mut Self::Target {
126 &mut self.recv
127 }
128}