Skip to main content

matrix_sdk/
live_locations_observer.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types for live location sharing.
16//!
17//! Live location sharing allows users to share their real-time location with
18//! others in a room via [MSC3489](https://github.com/matrix-org/matrix-spec-proposals/pull/3489).
19
20use std::sync::Arc;
21
22use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
23use imbl::Vector;
24use matrix_sdk_base::{deserialized_responses::SyncOrStrippedState, event_cache::Event};
25use matrix_sdk_common::locks::Mutex;
26use ruma::{
27    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
28    events::{
29        AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncStateEvent,
30        beacon::OriginalSyncBeaconEvent,
31        beacon_info::{BeaconInfoEventContent, OriginalSyncBeaconInfoEvent},
32        location::LocationContent,
33        relation::RelationType,
34    },
35};
36
37use super::Room;
38use crate::event_handler::EventHandlerDropGuard;
39
40/// Details of the last known location beacon.
41#[derive(Clone, Debug)]
42pub struct LastLocation {
43    /// The most recent location content of the asset.
44    pub location: LocationContent,
45    /// The timestamp of when the location was updated.
46    pub ts: MilliSecondsSinceUnixEpoch,
47}
48
49/// Details of a user's live location share.
50#[derive(Clone, Debug)]
51pub struct LiveLocationShare {
52    /// The user ID of the person sharing their live location.
53    pub user_id: OwnedUserId,
54    /// The asset's last known location, if any beacon has been received.
55    pub last_location: Option<LastLocation>,
56    /// The event ID of the beacon_info state event for this share.
57    pub beacon_id: OwnedEventId,
58    /// Information about the associated beacon event.
59    pub beacon_info: BeaconInfoEventContent,
60}
61
62/// A `beacon_info` update for the current user in one room.
63#[derive(Clone, Debug)]
64pub struct BeaconInfoUpdate {
65    /// The room where the `beacon_info` update was observed.
66    pub room_id: OwnedRoomId,
67    /// The event ID of the `beacon_info` event.
68    pub event_id: OwnedEventId,
69    /// The `beacon_info` event content.
70    pub content: BeaconInfoEventContent,
71}
72
73/// Tracks active live location shares in a room using an [`ObservableVector`].
74///
75/// Registers event handlers for beacon (location update) and beacon info
76/// (share started/stopped) events and reflects changes into a vector that
77/// callers can subscribe to via [`LiveLocationsObserver::subscribe`].
78///
79/// Event handlers are automatically unregistered when this struct is dropped.
80#[derive(Debug)]
81pub struct LiveLocationsObserver {
82    shares: Arc<Mutex<ObservableVector<LiveLocationShare>>>,
83    _beacon_guard: EventHandlerDropGuard,
84    _beacon_info_guard: EventHandlerDropGuard,
85}
86
87impl LiveLocationsObserver {
88    /// Create a new [`LiveLocationsObserver`] for the given room.
89    ///
90    /// Loads the current active shares from the event cache as initial state,
91    /// then begins listening for beacon events to keep the vector up-to-date.
92    pub(super) async fn new(room: Room) -> Self {
93        let mut shares = ObservableVector::new();
94        let initial_shares =
95            Self::get_initial_live_location_shares(&room).await.unwrap_or_default();
96        shares.append(initial_shares);
97        let shares = Arc::new(Mutex::new(shares));
98
99        let beacon_handle = room.add_event_handler({
100            let shares = shares.clone();
101            async move |event: OriginalSyncBeaconEvent| {
102                Self::handle_beacon_event(&shares, event);
103            }
104        });
105        let beacon_guard = room.client.event_handler_drop_guard(beacon_handle);
106        let beacon_info_handle = room.add_event_handler({
107            let shares = shares.clone();
108            async move |event: OriginalSyncBeaconInfoEvent, room: Room| {
109                Self::handle_beacon_info_event(&shares, &room, event).await;
110            }
111        });
112        let beacon_info_guard = room.client.event_handler_drop_guard(beacon_info_handle);
113        Self { shares, _beacon_guard: beacon_guard, _beacon_info_guard: beacon_info_guard }
114    }
115
116    /// Subscribe to changes and updates in the live location shares.
117    ///
118    /// Returns a snapshot of the current items alongside a batched stream of
119    /// [`eyeball_im::VectorDiff`]s that describe subsequent changes.
120    pub fn subscribe(
121        &self,
122    ) -> (Vector<LiveLocationShare>, VectorSubscriberBatchedStream<LiveLocationShare>) {
123        self.shares.lock().subscribe().into_values_and_batched_stream()
124    }
125
126    /// Get all currently active live location shares in a room.
127    async fn get_initial_live_location_shares(
128        room: &Room,
129    ) -> crate::Result<Vector<LiveLocationShare>> {
130        // Beacon infos are stored in the state store, not the event cache.
131        let beacon_infos = room.get_state_events_static::<BeaconInfoEventContent>().await?;
132        // Event cache is only needed for finding last location (optional).
133        let event_cache = room.event_cache().await.ok();
134        let mut shares = Vector::new();
135        for raw_beacon_info in beacon_infos {
136            let Ok(event) = raw_beacon_info.deserialize() else { continue };
137            let Some((user_id, beacon_info, event_id)) = Self::extract_live_beacon_info(event)
138            else {
139                continue;
140            };
141            let last_location = match &event_cache {
142                Some((cache, _drop_handles)) => Self::find_last_location(cache, &event_id).await,
143                None => None,
144            };
145            shares.push_back(LiveLocationShare {
146                user_id,
147                beacon_info,
148                beacon_id: event_id,
149                last_location,
150            });
151        }
152        Ok(shares)
153    }
154
155    /// Extracts a live beacon info from a state event.
156    ///
157    /// Returns `(user_id, content, event_id)`, or `None` if the event is
158    /// redacted/stripped or not currently live.
159    fn extract_live_beacon_info(
160        event: SyncOrStrippedState<BeaconInfoEventContent>,
161    ) -> Option<(OwnedUserId, BeaconInfoEventContent, OwnedEventId)> {
162        let SyncOrStrippedState::Sync(SyncStateEvent::Original(ev)) = event else {
163            return None;
164        };
165        if !ev.content.is_live() {
166            return None;
167        }
168        Some((ev.state_key, ev.content, ev.event_id))
169    }
170
171    /// Finds the most recent beacon event referencing the given beacon_info
172    /// event.
173    ///
174    /// Beacon events use an `m.reference` relation to point to their
175    /// originating `beacon_info` state event. The event cache's relation
176    /// index lets us look them up directly by ID without scanning all
177    /// cached events.
178    async fn find_last_location(
179        cache: &crate::event_cache::RoomEventCache,
180        beacon_info_event_id: &OwnedEventId,
181    ) -> Option<LastLocation> {
182        cache
183            .find_event_relations(beacon_info_event_id, Some(vec![RelationType::Reference]))
184            .await
185            .ok()?
186            .into_iter()
187            .rev()
188            .find_map(|e| Self::event_to_last_location(&e))
189    }
190
191    /// Converts an [`Event`] to a [`LastLocation`] if it is a beacon event.
192    fn event_to_last_location(event: &Event) -> Option<LastLocation> {
193        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Beacon(
194            beacon_event,
195        ))) = event.kind.raw().deserialize()
196        {
197            beacon_event.as_original().map(|beacon| LastLocation {
198                location: beacon.content.location.clone(),
199                ts: beacon.origin_server_ts,
200            })
201        } else {
202            None
203        }
204    }
205
206    /// Handles a single beacon event (location update).
207    ///
208    /// Matches the beacon to its share via `relates_to.event_id`, which
209    /// references the originating `beacon_info` state event.
210    fn handle_beacon_event(
211        shares: &Mutex<ObservableVector<LiveLocationShare>>,
212        event: OriginalSyncBeaconEvent,
213    ) {
214        let beacon_info_event_id = &event.content.relates_to.event_id;
215        let mut shares = shares.lock();
216        if let Some(idx) = shares.iter().position(|s| s.beacon_id == *beacon_info_event_id) {
217            // Check if beacon info is still live, if not, remove the share and ignore the
218            // beacon event.
219            let mut share = shares[idx].clone();
220            if !share.beacon_info.is_live() {
221                shares.remove(idx);
222                return;
223            }
224            let last_location =
225                LastLocation { location: event.content.location, ts: event.origin_server_ts };
226            share.last_location = Some(last_location);
227            shares.set(idx, share);
228        }
229    }
230
231    /// Handles a single beacon_info state event (share started or stopped).
232    ///
233    /// When a new beacon info is received for an already tracked user, the
234    /// share is removed from the vector. If the new beacon info is live, we add
235    /// it at the end of the vector, looking up the event cache to find any
236    /// beacon event that may have arrived before the beacon_info.
237    async fn handle_beacon_info_event(
238        shares: &Mutex<ObservableVector<LiveLocationShare>>,
239        room: &Room,
240        event: OriginalSyncBeaconInfoEvent,
241    ) {
242        {
243            let mut shares = shares.lock();
244            if let Some(idx) = shares.iter().position(|s| s.user_id == *event.state_key) {
245                shares.remove(idx);
246            }
247        }
248        if event.content.is_live() {
249            let last_location = if let Ok((cache, _drop_handles)) = room.event_cache().await {
250                Self::find_last_location(&cache, &event.event_id).await
251            } else {
252                None
253            };
254            let share = LiveLocationShare {
255                user_id: event.state_key,
256                beacon_id: event.event_id,
257                beacon_info: event.content,
258                last_location,
259            };
260            shares.lock().push_back(share);
261        }
262    }
263}