use std::{collections::BTreeSet, fmt, sync::Arc};
use as_variant::as_variant;
use eyeball_im::VectorDiff;
use eyeball_im_util::vector::VectorObserverExt;
use futures_core::Stream;
use imbl::Vector;
#[cfg(test)]
use matrix_sdk::crypto::OlmMachine;
use matrix_sdk::{
deserialized_responses::{SyncTimelineEvent, TimelineEventKind as SdkTimelineEventKind},
event_cache::{paginator::Paginator, RoomEventCache},
send_queue::{
LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
},
Result, Room,
};
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
poll::unstable_start::UnstablePollStartEventContent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
relation::Annotation,
room::message::{MessageType, Relation},
AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
AnySyncTimelineEvent, MessageLikeEventType,
},
serde::Raw,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
TransactionId, UserId,
};
#[cfg(test)]
use ruma::{events::receipt::ReceiptEventContent, RoomId};
use tokio::sync::{RwLock, RwLockWriteGuard};
use tracing::{
debug, error, field, field::debug, info, info_span, instrument, trace, warn, Instrument as _,
};
#[cfg(test)]
pub(super) use self::observable_items::ObservableItems;
pub(super) use self::{
observable_items::{
AllRemoteEvents, ObservableItemsEntry, ObservableItemsTransaction,
ObservableItemsTransactionEntry,
},
state::{
FullEventMeta, PendingEdit, PendingEditKind, TimelineMetadata, TimelineNewItemPosition,
TimelineState, TimelineStateTransaction,
},
};
use super::{
event_handler::TimelineEventKind,
event_item::{ReactionStatus, RemoteEventOrigin},
item::TimelineUniqueId,
traits::{Decryptor, RoomDataProvider},
util::{rfind_event_by_id, rfind_event_item, RelativePosition},
DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message,
PaginationError, Profile, ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId,
TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
};
use crate::{
timeline::{
date_dividers::DateDividerAdjuster,
event_item::EventTimelineItemKind,
pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
reactions::FullReactionKey,
util::rfind_event_by_item_id,
TimelineEventFilterFn,
},
unable_to_decrypt_hook::UtdHookManager,
};
mod observable_items;
mod state;
#[derive(Debug)]
enum TimelineFocusData<P: RoomDataProvider> {
Live,
Event {
event_id: OwnedEventId,
paginator: Paginator<P>,
num_context_events: u16,
},
PinnedEvents {
loader: PinnedEventsLoader,
},
}
#[derive(Clone, Debug)]
pub(super) struct TimelineController<P: RoomDataProvider = Room> {
state: Arc<RwLock<TimelineState>>,
focus: Arc<RwLock<TimelineFocusData<P>>>,
pub(crate) room_data_provider: P,
settings: TimelineSettings,
}
#[derive(Clone)]
pub(super) struct TimelineSettings {
pub(super) track_read_receipts: bool,
pub(super) event_filter: Arc<TimelineEventFilterFn>,
pub(super) add_failed_to_parse: bool,
pub(super) date_divider_mode: DateDividerMode,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for TimelineSettings {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimelineSettings")
.field("track_read_receipts", &self.track_read_receipts)
.field("add_failed_to_parse", &self.add_failed_to_parse)
.finish_non_exhaustive()
}
}
impl Default for TimelineSettings {
fn default() -> Self {
Self {
track_read_receipts: false,
event_filter: Arc::new(default_event_filter),
add_failed_to_parse: true,
date_divider_mode: DateDividerMode::Daily,
}
}
}
#[derive(Debug, Clone, Copy)]
enum TimelineFocusKind {
Live,
Event,
PinnedEvents,
}
pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
match event {
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
if ev.redacts(room_version).is_some() {
false
} else {
ev.event_type() != MessageLikeEventType::Reaction
}
}
AnySyncTimelineEvent::MessageLike(msg) => {
match msg.original_content() {
None => {
msg.event_type() != MessageLikeEventType::Reaction
}
Some(original_content) => {
match original_content {
AnyMessageLikeEventContent::RoomMessage(content) => {
if content
.relates_to
.as_ref()
.is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
{
return false;
}
matches!(
content.msgtype,
MessageType::Audio(_)
| MessageType::Emote(_)
| MessageType::File(_)
| MessageType::Image(_)
| MessageType::Location(_)
| MessageType::Notice(_)
| MessageType::ServerNotice(_)
| MessageType::Text(_)
| MessageType::Video(_)
| MessageType::VerificationRequest(_)
)
}
AnyMessageLikeEventContent::Sticker(_)
| AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(_),
)
| AnyMessageLikeEventContent::CallInvite(_)
| AnyMessageLikeEventContent::CallNotify(_)
| AnyMessageLikeEventContent::RoomEncrypted(_) => true,
_ => false,
}
}
}
}
AnySyncTimelineEvent::State(_) => {
true
}
}
}
impl<P: RoomDataProvider> TimelineController<P> {
pub(super) fn new(
room_data_provider: P,
focus: TimelineFocus,
internal_id_prefix: Option<String>,
unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
is_room_encrypted: Option<bool>,
) -> Self {
let (focus_data, focus_kind) = match focus {
TimelineFocus::Live => (TimelineFocusData::Live, TimelineFocusKind::Live),
TimelineFocus::Event { target, num_context_events } => {
let paginator = Paginator::new(room_data_provider.clone());
(
TimelineFocusData::Event { paginator, event_id: target, num_context_events },
TimelineFocusKind::Event,
)
}
TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
TimelineFocusData::PinnedEvents {
loader: PinnedEventsLoader::new(
Arc::new(room_data_provider.clone()),
max_events_to_load as usize,
max_concurrent_requests as usize,
),
},
TimelineFocusKind::PinnedEvents,
),
};
let state = TimelineState::new(
focus_kind,
room_data_provider.own_user_id().to_owned(),
room_data_provider.room_version(),
internal_id_prefix,
unable_to_decrypt_hook,
is_room_encrypted,
);
Self {
state: Arc::new(RwLock::new(state)),
focus: Arc::new(RwLock::new(focus_data)),
room_data_provider,
settings: Default::default(),
}
}
pub(super) async fn init_focus(
&self,
room_event_cache: &RoomEventCache,
) -> Result<bool, Error> {
let focus_guard = self.focus.read().await;
match &*focus_guard {
TimelineFocusData::Live => {
let (events, _) =
room_event_cache.subscribe().await.map_err(Error::EventCacheError)?;
let has_events = !events.is_empty();
self.replace_with_initial_remote_events(
events.into_iter(),
RemoteEventOrigin::Cache,
)
.await;
Ok(has_events)
}
TimelineFocusData::Event { event_id, paginator, num_context_events } => {
let start_from_result = paginator
.start_from(event_id, (*num_context_events).into())
.await
.map_err(PaginationError::Paginator)?;
drop(focus_guard);
let has_events = !start_from_result.events.is_empty();
self.replace_with_initial_remote_events(
start_from_result.events.into_iter(),
RemoteEventOrigin::Pagination,
)
.await;
Ok(has_events)
}
TimelineFocusData::PinnedEvents { loader } => {
let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
drop(focus_guard);
let has_events = !loaded_events.is_empty();
self.replace_with_initial_remote_events(
loaded_events.into_iter(),
RemoteEventOrigin::Pagination,
)
.await;
Ok(has_events)
}
}
}
pub async fn handle_encryption_state_changes(&self) {
let mut room_info = self.room_data_provider.room_info();
while let Some(info) = room_info.next().await {
let changed = {
let state = self.state.read().await;
let mut old_is_room_encrypted = state.meta.is_room_encrypted.write().unwrap();
let is_encrypted_now = info.is_encrypted();
if *old_is_room_encrypted != Some(is_encrypted_now) {
*old_is_room_encrypted = Some(is_encrypted_now);
true
} else {
false
}
};
if changed {
let mut state = self.state.write().await;
state.update_all_events_is_room_encrypted();
}
}
}
pub(crate) async fn reload_pinned_events(
&self,
) -> Result<Vec<SyncTimelineEvent>, PinnedEventsLoaderError> {
let focus_guard = self.focus.read().await;
if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
loader.load_events().await
} else {
Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
}
}
pub(super) async fn focused_paginate_backwards(
&self,
num_events: u16,
) -> Result<bool, PaginationError> {
let pagination = match &*self.focus.read().await {
TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
return Err(PaginationError::NotEventFocusMode)
}
TimelineFocusData::Event { paginator, .. } => paginator
.paginate_backward(num_events.into())
.await
.map_err(PaginationError::Paginator)?,
};
self.add_events_at(
pagination.events.into_iter(),
TimelineNewItemPosition::Start { origin: RemoteEventOrigin::Pagination },
)
.await;
Ok(pagination.hit_end_of_timeline)
}
pub(super) async fn focused_paginate_forwards(
&self,
num_events: u16,
) -> Result<bool, PaginationError> {
let pagination = match &*self.focus.read().await {
TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
return Err(PaginationError::NotEventFocusMode)
}
TimelineFocusData::Event { paginator, .. } => paginator
.paginate_forward(num_events.into())
.await
.map_err(PaginationError::Paginator)?,
};
self.add_events_at(
pagination.events.into_iter(),
TimelineNewItemPosition::End { origin: RemoteEventOrigin::Pagination },
)
.await;
Ok(pagination.hit_end_of_timeline)
}
pub(super) async fn is_live(&self) -> bool {
matches!(&*self.focus.read().await, TimelineFocusData::Live)
}
pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
self.settings = settings;
self
}
pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
self.state.read().await.items.clone_items()
}
pub(super) async fn subscribe(
&self,
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + Send) {
trace!("Creating timeline items signal");
let state = self.state.read().await;
(state.items.clone_items(), state.items.subscribe().into_stream())
}
pub(super) async fn subscribe_batched(
&self,
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
trace!("Creating timeline items signal");
let state = self.state.read().await;
(state.items.clone_items(), state.items.subscribe().into_batched_stream())
}
pub(super) async fn subscribe_filter_map<U, F>(
&self,
f: F,
) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
where
U: Clone,
F: Fn(Arc<TimelineItem>) -> Option<U>,
{
trace!("Creating timeline items signal");
self.state.read().await.items.subscribe().filter_map(f)
}
#[instrument(skip_all)]
pub(super) async fn toggle_reaction_local(
&self,
item_id: &TimelineEventItemId,
key: &str,
) -> Result<bool, Error> {
let mut state = self.state.write().await;
let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
warn!("Timeline item not found, can't add reaction");
return Err(Error::FailedToToggleReaction);
};
let user_id = self.room_data_provider.own_user_id();
let prev_status = item
.reactions()
.get(key)
.and_then(|group| group.get(user_id))
.map(|reaction_info| reaction_info.status.clone());
let Some(prev_status) = prev_status else {
match &item.kind {
EventTimelineItemKind::Local(local) => {
if let Some(send_handle) = &local.send_handle {
if send_handle
.react(key.to_owned())
.await
.map_err(|err| Error::SendQueueError(err.into()))?
.is_some()
{
trace!("adding a reaction to a local echo");
return Ok(true);
}
warn!("couldn't toggle reaction for local echo");
return Ok(false);
}
warn!("missing send handle for local echo; is this a test?");
return Ok(false);
}
EventTimelineItemKind::Remote(remote) => {
trace!("adding a reaction to a remote echo");
let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
self.room_data_provider
.send(ReactionEventContent::from(annotation).into())
.await?;
return Ok(true);
}
}
};
trace!("removing a previous reaction");
match prev_status {
ReactionStatus::LocalToLocal(send_reaction_handle) => {
if let Some(handle) = send_reaction_handle {
if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
warn!("unexpectedly unable to abort sending of local reaction");
}
} else {
warn!("no send reaction handle (this should only happen in testing contexts)");
}
}
ReactionStatus::LocalToRemote(send_handle) => {
trace!("aborting send of the previous reaction that was a local echo");
if let Some(handle) = send_handle {
if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
warn!("unexpectedly unable to abort sending of local reaction");
}
} else {
warn!("no send handle (this should only happen in testing contexts)");
}
}
ReactionStatus::RemoteToRemote(event_id) => {
let Some(annotated_event_id) =
item.as_remote().map(|event_item| event_item.event_id.clone())
else {
warn!("remote reaction to remote event, but the associated item isn't remote");
return Ok(false);
};
let mut reactions = item.reactions().clone();
let reaction_info = reactions.remove_reaction(user_id, key);
if reaction_info.is_some() {
let new_item = item.with_reactions(reactions);
state.items.replace(item_pos, new_item);
} else {
warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
}
drop(state);
trace!("sending redact for a previous reaction");
if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
if let Some(reaction_info) = reaction_info {
debug!("sending redact failed, adding the reaction back to the list");
let mut state = self.state.write().await;
if let Some((item_pos, item)) =
rfind_event_by_id(&state.items, &annotated_event_id)
{
let mut reactions = item.reactions().clone();
reactions
.entry(key.to_owned())
.or_default()
.insert(user_id.to_owned(), reaction_info);
let new_item = item.with_reactions(reactions);
state.items.replace(item_pos, new_item);
} else {
warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
}
}
return Err(err);
}
}
}
Ok(false)
}
pub(super) async fn add_events_at<Events>(
&self,
events: Events,
position: TimelineNewItemPosition,
) -> HandleManyEventsResult
where
Events: IntoIterator + ExactSizeIterator,
<Events as IntoIterator>::Item: Into<SyncTimelineEvent>,
{
if events.len() == 0 {
return Default::default();
}
let mut state = self.state.write().await;
state.add_remote_events_at(events, position, &self.room_data_provider, &self.settings).await
}
pub(super) async fn clear(&self) {
self.state.write().await.clear();
}
pub(super) async fn replace_with_initial_remote_events<Events>(
&self,
events: Events,
origin: RemoteEventOrigin,
) where
Events: IntoIterator + ExactSizeIterator,
<Events as IntoIterator>::Item: Into<SyncTimelineEvent>,
{
let mut state = self.state.write().await;
let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
state
.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
.await;
}
if !state.items.is_empty() || events.len() > 0 {
state
.replace_with_remote_events(
events,
TimelineNewItemPosition::End { origin },
&self.room_data_provider,
&self.settings,
)
.await;
}
if track_read_markers {
if let Some(fully_read_event_id) =
self.room_data_provider.load_fully_read_marker().await
{
state.handle_fully_read_marker(fully_read_event_id);
}
}
}
pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
}
pub(super) async fn handle_ephemeral_events(
&self,
events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
) {
let mut state = self.state.write().await;
state.handle_ephemeral_events(events, &self.room_data_provider).await;
}
#[instrument(skip_all)]
pub(super) async fn handle_local_event(
&self,
txn_id: OwnedTransactionId,
content: TimelineEventKind,
send_handle: Option<SendHandle>,
) {
let sender = self.room_data_provider.own_user_id().to_owned();
let profile = self.room_data_provider.profile_from_user_id(&sender).await;
let should_add_new_items = self.is_live().await;
let date_divider_mode = self.settings.date_divider_mode.clone();
let mut state = self.state.write().await;
state
.handle_local_event(
sender,
profile,
should_add_new_items,
date_divider_mode,
txn_id,
send_handle,
content,
)
.await;
}
#[instrument(skip(self))]
pub(super) async fn update_event_send_state(
&self,
txn_id: &TransactionId,
send_state: EventSendState,
) {
let mut state = self.state.write().await;
let mut txn = state.transaction();
let new_event_id: Option<&EventId> =
as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
if rfind_event_item(&txn.items, |it| {
new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
})
.is_some()
{
trace!("Remote echo received before send-event response");
let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
if let Some((idx, _)) = local_echo {
warn!("Message echo got duplicated, removing the local one");
txn.items.remove(idx);
let mut adjuster =
DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
adjuster.run(&mut txn.items, &mut txn.meta);
}
txn.commit();
return;
}
let result = rfind_event_item(&txn.items, |it| {
it.transaction_id() == Some(txn_id)
|| new_event_id.is_some()
&& it.event_id() == new_event_id
&& it.as_local().is_some()
});
let Some((idx, item)) = result else {
if let Some((event_id, reaction_key)) = new_event_id.zip(
txn.meta.reactions.map.get(&TimelineEventItemId::TransactionId(txn_id.to_owned())),
) {
match &reaction_key.item {
TimelineEventItemId::TransactionId(_) => {
error!("unexpected remote reaction to local echo")
}
TimelineEventItemId::EventId(reacted_to_event_id) => {
if let Some((item_pos, event_item)) =
rfind_event_by_id(&txn.items, reacted_to_event_id)
{
let mut reactions = event_item.reactions().clone();
if let Some(entry) = reactions
.get_mut(&reaction_key.key)
.and_then(|by_user| by_user.get_mut(&reaction_key.sender))
{
trace!("updated reaction status to sent");
entry.status = ReactionStatus::RemoteToRemote(event_id.to_owned());
txn.items.replace(item_pos, event_item.with_reactions(reactions));
txn.commit();
return;
}
}
}
}
}
warn!("Timeline item not found, can't update send state");
return;
};
let Some(local_item) = item.as_local() else {
warn!("We looked for a local item, but it transitioned to remote??");
return;
};
if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
}
if let Some(new_event_id) = new_event_id {
let reactions = item.reactions();
for (_key, by_user) in reactions.iter() {
for (_user_id, info) in by_user.iter() {
if let ReactionStatus::LocalToLocal(Some(reaction_handle)) = &info.status {
let reaction_txn_id = reaction_handle.transaction_id().to_owned();
if let Some(found) = txn
.meta
.reactions
.map
.get_mut(&TimelineEventItemId::TransactionId(reaction_txn_id))
{
found.item = TimelineEventItemId::EventId(new_event_id.to_owned());
}
}
}
}
}
let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
txn.items.replace(idx, new_item);
txn.commit();
}
pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
let mut state = self.state.write().await;
if let Some((idx, _)) =
rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
{
let mut txn = state.transaction();
txn.items.remove(idx);
let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
adjuster.run(&mut txn.items, &mut txn.meta);
txn.meta.update_read_marker(&mut txn.items);
txn.commit();
debug!("Discarded local echo");
return true;
}
if let Some(full_key) =
state.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
{
let item = match &full_key.item {
TimelineEventItemId::TransactionId(txn_id) => {
rfind_event_item(&state.items, |item| item.transaction_id() == Some(txn_id))
}
TimelineEventItemId::EventId(event_id) => rfind_event_by_id(&state.items, event_id),
};
let Some((idx, item)) = item else {
warn!("missing reacted-to item for a reaction");
return false;
};
let mut reactions = item.reactions().clone();
if reactions.remove_reaction(&full_key.sender, &full_key.key).is_some() {
let updated_item = item.with_reactions(reactions);
state.items.replace(idx, updated_item);
} else {
warn!(
"missing reaction {} for sender {} on timeline item",
full_key.key, full_key.sender
);
}
return true;
}
debug!("Can't find local echo to discard");
false
}
pub(super) async fn replace_local_echo(
&self,
txn_id: &TransactionId,
content: AnyMessageLikeEventContent,
) -> bool {
let AnyMessageLikeEventContent::RoomMessage(content) = content else {
warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
return false;
};
let mut state = self.state.write().await;
let mut txn = state.transaction();
let Some((idx, prev_item)) =
rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
else {
debug!("Can't find local echo to replace");
return false;
};
let ti_kind = {
let Some(prev_local_item) = prev_item.as_local() else {
warn!("We looked for a local item, but it transitioned as remote??");
return false;
};
prev_local_item.with_send_state(EventSendState::NotSentYet)
};
let new_item = TimelineItem::new(
prev_item
.with_kind(ti_kind)
.with_content(TimelineItemContent::message(content, None, &txn.items), None),
prev_item.internal_id.to_owned(),
);
txn.items.replace(idx, new_item);
txn.commit();
debug!("Replaced local echo");
true
}
#[instrument(skip(self, room), fields(room_id = ?room.room_id()))]
pub(super) async fn retry_event_decryption(
&self,
room: &Room,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner(room.to_owned(), session_ids).await
}
#[cfg(test)]
pub(super) async fn retry_event_decryption_test(
&self,
room_id: &RoomId,
olm_machine: OlmMachine,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
}
async fn retry_event_decryption_inner(
&self,
decryptor: impl Decryptor,
session_ids: Option<BTreeSet<String>>,
) {
use super::EncryptedMessage;
let mut state = self.state.clone().write_owned().await;
let should_retry = move |session_id: &str| {
if let Some(session_ids) = &session_ids {
session_ids.contains(session_id)
} else {
true
}
};
let retry_indices: Vec<_> = state
.items
.iter()
.enumerate()
.filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
Some(idx)
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => None,
})
.collect();
if retry_indices.is_empty() {
return;
}
debug!("Retrying decryption");
let settings = self.settings.clone();
let room_data_provider = self.room_data_provider.clone();
let push_rules_context = room_data_provider.push_rules_and_context().await;
let unable_to_decrypt_hook = state.meta.unable_to_decrypt_hook.clone();
matrix_sdk::executor::spawn(async move {
let retry_one = |item: Arc<TimelineItem>| {
let decryptor = decryptor.clone();
let should_retry = &should_retry;
let unable_to_decrypt_hook = unable_to_decrypt_hook.clone();
async move {
let event_item = item.as_event()?;
let session_id = match event_item.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
session_id
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => return None,
};
tracing::Span::current().record("session_id", session_id);
let Some(remote_event) = event_item.as_remote() else {
error!("Key for unable-to-decrypt timeline item is not an event ID");
return None;
};
tracing::Span::current().record("event_id", debug(&remote_event.event_id));
let Some(original_json) = &remote_event.original_json else {
error!("UTD item must contain original JSON");
return None;
};
match decryptor.decrypt_event_impl(original_json).await {
Ok(event) => {
if let SdkTimelineEventKind::UnableToDecrypt { utd_info, .. } =
event.kind
{
info!(
"Failed to decrypt event after receiving room key: {:?}",
utd_info.reason
);
None
} else {
if let Some(hook) = unable_to_decrypt_hook {
hook.on_late_decrypt(&remote_event.event_id).await;
}
Some(event)
}
}
Err(e) => {
info!("Failed to decrypt event after receiving room key: {e}");
None
}
}
}
.instrument(info_span!(
"retry_one",
session_id = field::Empty,
event_id = field::Empty
))
};
state
.retry_event_decryption(
retry_one,
retry_indices,
push_rules_context,
&room_data_provider,
&settings,
)
.await;
});
}
pub(super) async fn set_sender_profiles_pending(&self) {
self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
}
pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
}
async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
self.state.write().await.items.for_each(|mut entry| {
let Some(event_item) = entry.as_event() else { return };
if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
let new_item = entry.with_kind(TimelineItemKind::Event(
event_item.with_sender_profile(profile_state.clone()),
));
ObservableItemsEntry::replace(&mut entry, new_item);
}
});
}
pub(super) async fn update_missing_sender_profiles(&self) {
trace!("Updating missing sender profiles");
let mut state = self.state.write().await;
let mut entries = state.items.entries();
while let Some(mut entry) = entries.next() {
let Some(event_item) = entry.as_event() else { continue };
let event_id = event_item.event_id().map(debug);
let transaction_id = event_item.transaction_id().map(debug);
if event_item.sender_profile().is_ready() {
trace!(event_id, transaction_id, "Profile already set");
continue;
}
match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
Some(profile) => {
trace!(event_id, transaction_id, "Adding profile");
let updated_item =
event_item.with_sender_profile(TimelineDetails::Ready(profile));
let new_item = entry.with_kind(updated_item);
ObservableItemsEntry::replace(&mut entry, new_item);
}
None => {
if !event_item.sender_profile().is_unavailable() {
trace!(event_id, transaction_id, "Marking profile unavailable");
let updated_item =
event_item.with_sender_profile(TimelineDetails::Unavailable);
let new_item = entry.with_kind(updated_item);
ObservableItemsEntry::replace(&mut entry, new_item);
} else {
debug!(event_id, transaction_id, "Profile already marked unavailable");
}
}
}
}
trace!("Done updating missing sender profiles");
}
pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
trace!("Forcing update of sender profiles: {sender_ids:?}");
let mut state = self.state.write().await;
let mut entries = state.items.entries();
while let Some(mut entry) = entries.next() {
let Some(event_item) = entry.as_event() else { continue };
if !sender_ids.contains(event_item.sender()) {
continue;
}
let event_id = event_item.event_id().map(debug);
let transaction_id = event_item.transaction_id().map(debug);
match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
Some(profile) => {
if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
{
debug!(event_id, transaction_id, "Profile already up-to-date");
} else {
trace!(event_id, transaction_id, "Updating profile");
let updated_item =
event_item.with_sender_profile(TimelineDetails::Ready(profile));
let new_item = entry.with_kind(updated_item);
ObservableItemsEntry::replace(&mut entry, new_item);
}
}
None => {
if !event_item.sender_profile().is_unavailable() {
trace!(event_id, transaction_id, "Marking profile unavailable");
let updated_item =
event_item.with_sender_profile(TimelineDetails::Unavailable);
let new_item = entry.with_kind(updated_item);
ObservableItemsEntry::replace(&mut entry, new_item);
} else {
debug!(event_id, transaction_id, "Profile already marked unavailable");
}
}
}
}
trace!("Done forcing update of sender profiles");
}
#[cfg(test)]
pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
let own_user_id = self.room_data_provider.own_user_id();
self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
}
pub(super) async fn latest_user_read_receipt(
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
}
pub(super) async fn latest_user_read_receipt_timeline_event_id(
&self,
user_id: &UserId,
) -> Option<OwnedEventId> {
self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
}
pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
}
pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
match echo.content {
LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
let content = match serialized_event.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo: {err}");
return;
}
};
self.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Some(send_handle),
)
.await;
if let Some(send_error) = send_error {
self.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
is_recoverable: false,
},
)
.await;
}
}
LocalEchoContent::React { key, send_handle, applies_to } => {
self.handle_local_reaction(key, send_handle, applies_to).await;
}
}
}
#[instrument(skip(self, send_handle))]
async fn handle_local_reaction(
&self,
reaction_key: String,
send_handle: SendReactionHandle,
applies_to: OwnedTransactionId,
) {
let mut state = self.state.write().await;
let Some((item_pos, item)) =
rfind_event_item(&state.items, |item| item.transaction_id() == Some(&applies_to))
else {
warn!("Local item not found anymore.");
return;
};
let user_id = self.room_data_provider.own_user_id();
let reaction_txn_id = send_handle.transaction_id().to_owned();
let reaction_info = ReactionInfo {
timestamp: MilliSecondsSinceUnixEpoch::now(),
status: ReactionStatus::LocalToLocal(Some(send_handle)),
};
let mut reactions = item.reactions().clone();
let by_user = reactions.entry(reaction_key.clone()).or_default();
by_user.insert(user_id.to_owned(), reaction_info);
trace!("Adding local reaction to local echo");
let new_item = item.with_reactions(reactions);
state.items.replace(item_pos, new_item);
state.meta.reactions.map.insert(
TimelineEventItemId::TransactionId(reaction_txn_id),
FullReactionKey {
item: TimelineEventItemId::TransactionId(applies_to),
key: reaction_key,
sender: user_id.to_owned(),
},
);
}
pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
match update {
RoomSendQueueUpdate::NewLocalEvent(echo) => {
self.handle_local_echo(echo).await;
}
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
if !self.discard_local_echo(&transaction_id).await {
warn!("couldn't find the local echo to discard");
}
}
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
let content = match new_content.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo (upon edit): {err}");
return;
}
};
if !self.replace_local_echo(&transaction_id, content).await {
warn!("couldn't find the local echo to replace");
}
}
RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
self.update_event_send_state(
&transaction_id,
EventSendState::SendingFailed { error, is_recoverable },
)
.await;
}
RoomSendQueueUpdate::RetryEvent { transaction_id } => {
self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
}
RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
.await;
}
RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
info!(txn_id = %related_to, "some media for a media event has been uploaded");
}
}
}
}
impl TimelineController {
pub(super) fn room(&self) -> &Room {
&self.room_data_provider
}
#[instrument(skip(self))]
pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
let state = self.state.write().await;
let (index, item) = rfind_event_by_id(&state.items, event_id)
.ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
let remote_item = item
.as_remote()
.ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
.clone();
let TimelineItemContent::Message(message) = item.content().clone() else {
debug!("Event is not a message");
return Ok(());
};
let Some(in_reply_to) = message.in_reply_to() else {
debug!("Event is not a reply");
return Ok(());
};
if let TimelineDetails::Pending = &in_reply_to.event {
debug!("Replied-to event is already being fetched");
return Ok(());
}
if let TimelineDetails::Ready(_) = &in_reply_to.event {
debug!("Replied-to event has already been fetched");
return Ok(());
}
let internal_id = item.internal_id.to_owned();
let item = item.clone();
let event = fetch_replied_to_event(
state,
index,
&item,
internal_id,
&message,
&in_reply_to.event_id,
self.room(),
)
.await?;
let mut state = self.state.write().await;
let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
.ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
let TimelineItemContent::Message(message) = item.content().clone() else {
info!("Event is no longer a message (redacted?)");
return Ok(());
};
let Some(in_reply_to) = message.in_reply_to() else {
warn!("Event no longer has a reply (bug?)");
return Ok(());
};
trace!("Updating in-reply-to details");
let internal_id = item.internal_id.to_owned();
let mut item = item.clone();
item.set_content(TimelineItemContent::Message(
message.with_in_reply_to(InReplyToDetails {
event_id: in_reply_to.event_id.clone(),
event,
}),
));
state.items.replace(index, TimelineItem::new(item, internal_id));
Ok(())
}
pub(super) async fn should_send_receipt(
&self,
receipt_type: &SendReceiptType,
thread: &ReceiptThread,
event_id: &EventId,
) -> bool {
if *thread != ReceiptThread::Unthreaded {
return true;
}
let own_user_id = self.room().own_user_id();
let state = self.state.read().await;
let room = self.room();
match receipt_type {
SendReceiptType::Read => {
if let Some((old_pub_read, _)) = state
.meta
.user_receipt(
own_user_id,
ReceiptType::Read,
room,
state.items.all_remote_events(),
)
.await
{
trace!(%old_pub_read, "found a previous public receipt");
if let Some(relative_pos) = state.meta.compare_events_positions(
&old_pub_read,
event_id,
state.items.all_remote_events(),
) {
trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
return relative_pos == RelativePosition::After;
}
}
}
SendReceiptType::ReadPrivate => {
if let Some((old_priv_read, _)) =
state.latest_user_read_receipt(own_user_id, room).await
{
trace!(%old_priv_read, "found a previous private receipt");
if let Some(relative_pos) = state.meta.compare_events_positions(
&old_priv_read,
event_id,
state.items.all_remote_events(),
) {
trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
return relative_pos == RelativePosition::After;
}
}
}
SendReceiptType::FullyRead => {
if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
{
if let Some(relative_pos) = state.meta.compare_events_positions(
&prev_event_id,
event_id,
state.items.all_remote_events(),
) {
return relative_pos == RelativePosition::After;
}
}
}
_ => {}
}
true
}
pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
let state = self.state.read().await;
state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
}
}
#[derive(Debug, Default)]
pub(super) struct HandleManyEventsResult {
pub items_added: u64,
pub items_updated: u64,
}
async fn fetch_replied_to_event(
mut state: RwLockWriteGuard<'_, TimelineState>,
index: usize,
item: &EventTimelineItem,
internal_id: TimelineUniqueId,
message: &Message,
in_reply_to: &EventId,
room: &Room,
) -> Result<TimelineDetails<Box<RepliedToEvent>>, Error> {
if let Some((_, item)) = rfind_event_by_id(&state.items, in_reply_to) {
let details = TimelineDetails::Ready(Box::new(RepliedToEvent {
content: item.content.clone(),
sender: item.sender().to_owned(),
sender_profile: item.sender_profile().clone(),
}));
trace!("Found replied-to event locally");
return Ok(details);
};
trace!("Setting in-reply-to details to pending");
let reply = message.with_in_reply_to(InReplyToDetails {
event_id: in_reply_to.to_owned(),
event: TimelineDetails::Pending,
});
let event_item = item.with_content(TimelineItemContent::Message(reply), None);
let new_timeline_item = TimelineItem::new(event_item, internal_id);
state.items.replace(index, new_timeline_item);
drop(state);
trace!("Fetching replied-to event");
let res = match room.event(in_reply_to, None).await {
Ok(timeline_event) => TimelineDetails::Ready(Box::new(
RepliedToEvent::try_from_timeline_event(timeline_event, room).await?,
)),
Err(e) => TimelineDetails::Error(Arc::new(e)),
};
Ok(res)
}