matrix_sdk/
live_locations_observer.rs1use std::sync::Arc;
21
22use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
23use imbl::Vector;
24use matrix_sdk_base::{deserialized_responses::SyncOrStrippedState, event_cache::Event};
25use matrix_sdk_common::locks::Mutex;
26use ruma::{
27 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
28 events::{
29 AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncStateEvent,
30 beacon::OriginalSyncBeaconEvent,
31 beacon_info::{BeaconInfoEventContent, OriginalSyncBeaconInfoEvent},
32 location::LocationContent,
33 relation::RelationType,
34 },
35};
36
37use super::Room;
38use crate::event_handler::EventHandlerDropGuard;
39
40#[derive(Clone, Debug)]
42pub struct LastLocation {
43 pub location: LocationContent,
45 pub ts: MilliSecondsSinceUnixEpoch,
47}
48
49#[derive(Clone, Debug)]
51pub struct LiveLocationShare {
52 pub user_id: OwnedUserId,
54 pub last_location: Option<LastLocation>,
56 pub beacon_id: OwnedEventId,
58 pub beacon_info: BeaconInfoEventContent,
60}
61
62#[derive(Clone, Debug)]
64pub struct BeaconInfoUpdate {
65 pub room_id: OwnedRoomId,
67 pub event_id: OwnedEventId,
69 pub content: BeaconInfoEventContent,
71}
72
73#[derive(Debug)]
81pub struct LiveLocationsObserver {
82 shares: Arc<Mutex<ObservableVector<LiveLocationShare>>>,
83 _beacon_guard: EventHandlerDropGuard,
84 _beacon_info_guard: EventHandlerDropGuard,
85}
86
87impl LiveLocationsObserver {
88 pub(super) async fn new(room: Room) -> Self {
93 let mut shares = ObservableVector::new();
94 let initial_shares =
95 Self::get_initial_live_location_shares(&room).await.unwrap_or_default();
96 shares.append(initial_shares);
97 let shares = Arc::new(Mutex::new(shares));
98
99 let beacon_handle = room.add_event_handler({
100 let shares = shares.clone();
101 async move |event: OriginalSyncBeaconEvent| {
102 Self::handle_beacon_event(&shares, event);
103 }
104 });
105 let beacon_guard = room.client.event_handler_drop_guard(beacon_handle);
106 let beacon_info_handle = room.add_event_handler({
107 let shares = shares.clone();
108 async move |event: OriginalSyncBeaconInfoEvent, room: Room| {
109 Self::handle_beacon_info_event(&shares, &room, event).await;
110 }
111 });
112 let beacon_info_guard = room.client.event_handler_drop_guard(beacon_info_handle);
113 Self { shares, _beacon_guard: beacon_guard, _beacon_info_guard: beacon_info_guard }
114 }
115
116 pub fn subscribe(
121 &self,
122 ) -> (Vector<LiveLocationShare>, VectorSubscriberBatchedStream<LiveLocationShare>) {
123 self.shares.lock().subscribe().into_values_and_batched_stream()
124 }
125
126 async fn get_initial_live_location_shares(
128 room: &Room,
129 ) -> crate::Result<Vector<LiveLocationShare>> {
130 let beacon_infos = room.get_state_events_static::<BeaconInfoEventContent>().await?;
132 let event_cache = room.event_cache().await.ok();
134 let mut shares = Vector::new();
135 for raw_beacon_info in beacon_infos {
136 let Ok(event) = raw_beacon_info.deserialize() else { continue };
137 let Some((user_id, beacon_info, event_id)) = Self::extract_live_beacon_info(event)
138 else {
139 continue;
140 };
141 let last_location = match &event_cache {
142 Some((cache, _drop_handles)) => Self::find_last_location(cache, &event_id).await,
143 None => None,
144 };
145 shares.push_back(LiveLocationShare {
146 user_id,
147 beacon_info,
148 beacon_id: event_id,
149 last_location,
150 });
151 }
152 Ok(shares)
153 }
154
155 fn extract_live_beacon_info(
160 event: SyncOrStrippedState<BeaconInfoEventContent>,
161 ) -> Option<(OwnedUserId, BeaconInfoEventContent, OwnedEventId)> {
162 let SyncOrStrippedState::Sync(SyncStateEvent::Original(ev)) = event else {
163 return None;
164 };
165 if !ev.content.is_live() {
166 return None;
167 }
168 Some((ev.state_key, ev.content, ev.event_id))
169 }
170
171 async fn find_last_location(
179 cache: &crate::event_cache::RoomEventCache,
180 beacon_info_event_id: &OwnedEventId,
181 ) -> Option<LastLocation> {
182 cache
183 .find_event_relations(beacon_info_event_id, Some(vec![RelationType::Reference]))
184 .await
185 .ok()?
186 .into_iter()
187 .rev()
188 .find_map(|e| Self::event_to_last_location(&e))
189 }
190
191 fn event_to_last_location(event: &Event) -> Option<LastLocation> {
193 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Beacon(
194 beacon_event,
195 ))) = event.kind.raw().deserialize()
196 {
197 beacon_event.as_original().map(|beacon| LastLocation {
198 location: beacon.content.location.clone(),
199 ts: beacon.origin_server_ts,
200 })
201 } else {
202 None
203 }
204 }
205
206 fn handle_beacon_event(
211 shares: &Mutex<ObservableVector<LiveLocationShare>>,
212 event: OriginalSyncBeaconEvent,
213 ) {
214 let beacon_info_event_id = &event.content.relates_to.event_id;
215 let mut shares = shares.lock();
216 if let Some(idx) = shares.iter().position(|s| s.beacon_id == *beacon_info_event_id) {
217 let mut share = shares[idx].clone();
220 if !share.beacon_info.is_live() {
221 shares.remove(idx);
222 return;
223 }
224 let last_location =
225 LastLocation { location: event.content.location, ts: event.origin_server_ts };
226 share.last_location = Some(last_location);
227 shares.set(idx, share);
228 }
229 }
230
231 async fn handle_beacon_info_event(
238 shares: &Mutex<ObservableVector<LiveLocationShare>>,
239 room: &Room,
240 event: OriginalSyncBeaconInfoEvent,
241 ) {
242 {
243 let mut shares = shares.lock();
244 if let Some(idx) = shares.iter().position(|s| s.user_id == *event.state_key) {
245 shares.remove(idx);
246 }
247 }
248 if event.content.is_live() {
249 let last_location = if let Ok((cache, _drop_handles)) = room.event_cache().await {
250 Self::find_last_location(&cache, &event.event_id).await
251 } else {
252 None
253 };
254 let share = LiveLocationShare {
255 user_id: event.state_key,
256 beacon_id: event.event_id,
257 beacon_info: event.content,
258 last_location,
259 };
260 shares.lock().push_back(share);
261 }
262 }
263}