use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet},
mem,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock as StdRwLock,
},
};
use ruma::{
api::client::keys::claim_keys::v3::Request as KeysClaimRequest,
events::secret::request::{
RequestAction, SecretName, ToDeviceSecretRequestEvent as SecretRequestEvent,
},
DeviceId, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, RoomId,
TransactionId, UserId,
};
use tracing::{debug, field::debug, info, instrument, trace, warn, Span};
use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
use super::{GossipRequest, GossippedSecret, RequestEvent, RequestInfo, SecretInfo, WaitQueue};
use crate::{
error::{EventError, OlmError, OlmResult},
identities::IdentityManager,
olm::{InboundGroupSession, Session},
session_manager::GroupSessionCache,
store::{Changes, CryptoStoreError, SecretImportError, Store, StoreCache},
types::{
events::{
forwarded_room_key::ForwardedRoomKeyContent,
olm_v1::{DecryptedForwardedRoomKeyEvent, DecryptedSecretSendEvent},
room::encrypted::EncryptedEvent,
room_key_request::RoomKeyRequestEvent,
secret_send::SecretSendContent,
EventType,
},
requests::{OutgoingRequest, ToDeviceRequest},
},
Device, MegolmError,
};
#[derive(Clone, Debug)]
pub(crate) struct GossipMachine {
inner: Arc<GossipMachineInner>,
}
#[derive(Debug)]
pub(crate) struct GossipMachineInner {
store: Store,
#[cfg(feature = "automatic-room-key-forwarding")]
outbound_group_sessions: GroupSessionCache,
outgoing_requests: StdRwLock<BTreeMap<OwnedTransactionId, OutgoingRequest>>,
incoming_key_requests: StdRwLock<BTreeMap<RequestInfo, RequestEvent>>,
wait_queue: WaitQueue,
users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
room_key_forwarding_enabled: AtomicBool,
room_key_requests_enabled: AtomicBool,
identity_manager: IdentityManager,
}
impl GossipMachine {
pub fn new(
store: Store,
identity_manager: IdentityManager,
#[allow(unused)] outbound_group_sessions: GroupSessionCache,
users_for_key_claim: Arc<StdRwLock<BTreeMap<OwnedUserId, BTreeSet<OwnedDeviceId>>>>,
) -> Self {
let room_key_forwarding_enabled =
AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
let room_key_requests_enabled =
AtomicBool::new(cfg!(feature = "automatic-room-key-forwarding"));
Self {
inner: Arc::new(GossipMachineInner {
store,
#[cfg(feature = "automatic-room-key-forwarding")]
outbound_group_sessions,
outgoing_requests: Default::default(),
incoming_key_requests: Default::default(),
wait_queue: WaitQueue::new(),
users_for_key_claim,
room_key_forwarding_enabled,
room_key_requests_enabled,
identity_manager,
}),
}
}
pub(crate) fn identity_manager(&self) -> &IdentityManager {
&self.inner.identity_manager
}
#[cfg(feature = "automatic-room-key-forwarding")]
pub fn set_room_key_forwarding_enabled(&self, enabled: bool) {
self.inner.room_key_forwarding_enabled.store(enabled, Ordering::SeqCst)
}
pub fn is_room_key_forwarding_enabled(&self) -> bool {
self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst)
}
#[cfg(feature = "automatic-room-key-forwarding")]
pub fn set_room_key_requests_enabled(&self, enabled: bool) {
self.inner.room_key_requests_enabled.store(enabled, Ordering::SeqCst)
}
pub fn are_room_key_requests_enabled(&self) -> bool {
self.inner.room_key_requests_enabled.load(Ordering::SeqCst)
}
async fn load_outgoing_requests(&self) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
Ok(self
.inner
.store
.get_unsent_secret_requests()
.await?
.into_iter()
.filter(|i| !i.sent_out)
.map(|info| info.to_request(self.device_id()))
.collect())
}
pub fn user_id(&self) -> &UserId {
&self.inner.store.static_account().user_id
}
pub fn device_id(&self) -> &DeviceId {
&self.inner.store.static_account().device_id
}
pub async fn outgoing_to_device_requests(
&self,
) -> Result<Vec<OutgoingRequest>, CryptoStoreError> {
let mut key_requests = self.load_outgoing_requests().await?;
let key_forwards: Vec<OutgoingRequest> =
self.inner.outgoing_requests.read().unwrap().values().cloned().collect();
key_requests.extend(key_forwards);
let users_for_key_claim: BTreeMap<_, _> = self
.inner
.users_for_key_claim
.read()
.unwrap()
.iter()
.map(|(key, value)| {
let device_map = value
.iter()
.map(|d| (d.to_owned(), OneTimeKeyAlgorithm::SignedCurve25519))
.collect();
(key.to_owned(), device_map)
})
.collect();
if !users_for_key_claim.is_empty() {
let key_claim_request = KeysClaimRequest::new(users_for_key_claim);
key_requests.push(OutgoingRequest {
request_id: TransactionId::new(),
request: Arc::new(key_claim_request.into()),
});
}
Ok(key_requests)
}
pub fn receive_incoming_key_request(&self, event: &RoomKeyRequestEvent) {
self.receive_event(event.clone().into())
}
fn receive_event(&self, event: RequestEvent) {
if event.sender() == self.user_id() && event.requesting_device_id() == self.device_id() {
trace!("Received a secret request event from ourselves, ignoring")
} else {
let request_info = event.to_request_info();
self.inner.incoming_key_requests.write().unwrap().insert(request_info, event);
}
}
pub fn receive_incoming_secret_request(&self, event: &SecretRequestEvent) {
self.receive_event(event.clone().into())
}
pub async fn collect_incoming_key_requests(
&self,
cache: &StoreCache,
) -> OlmResult<Vec<Session>> {
let mut changed_sessions = Vec::new();
let incoming_key_requests =
mem::take(&mut *self.inner.incoming_key_requests.write().unwrap());
for event in incoming_key_requests.values() {
if let Some(s) = match event {
#[cfg(feature = "automatic-room-key-forwarding")]
RequestEvent::KeyShare(e) => Box::pin(self.handle_key_request(cache, e)).await?,
RequestEvent::Secret(e) => Box::pin(self.handle_secret_request(cache, e)).await?,
#[cfg(not(feature = "automatic-room-key-forwarding"))]
_ => None,
} {
changed_sessions.push(s);
}
}
Ok(changed_sessions)
}
fn handle_key_share_without_session(&self, device: Device, event: RequestEvent) {
self.inner
.users_for_key_claim
.write()
.unwrap()
.entry(device.user_id().to_owned())
.or_default()
.insert(device.device_id().into());
self.inner.wait_queue.insert(&device, event);
}
pub fn retry_keyshare(&self, user_id: &UserId, device_id: &DeviceId) {
if let Entry::Occupied(mut e) =
self.inner.users_for_key_claim.write().unwrap().entry(user_id.to_owned())
{
e.get_mut().remove(device_id);
if e.get().is_empty() {
e.remove();
}
}
let mut incoming_key_requests = self.inner.incoming_key_requests.write().unwrap();
for (key, event) in self.inner.wait_queue.remove(user_id, device_id) {
incoming_key_requests.entry(key).or_insert(event);
}
}
async fn handle_secret_request(
&self,
cache: &StoreCache,
event: &SecretRequestEvent,
) -> OlmResult<Option<Session>> {
let secret_name = match &event.content.action {
RequestAction::Request(s) => s,
RequestAction::RequestCancellation => return Ok(None),
action => {
warn!(?action, "Unknown secret request action");
return Ok(None);
}
};
let content = if let Some(secret) = self.inner.store.export_secret(secret_name).await? {
SecretSendContent::new(event.content.request_id.to_owned(), secret)
} else {
info!(?secret_name, "Can't serve a secret request, secret isn't found");
return Ok(None);
};
let device =
self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
if let Some(device) = device {
if device.user_id() == self.user_id() {
if device.is_verified() {
info!(
user_id = ?device.user_id(),
device_id = ?device.device_id(),
?secret_name,
"Sharing a secret with a device",
);
match self.share_secret(&device, content).await {
Ok(s) => Ok(Some(s)),
Err(OlmError::MissingSession) => {
info!(
user_id = ?device.user_id(),
device_id = ?device.device_id(),
?secret_name,
"Secret request is missing an Olm session, \
putting the request in the wait queue",
);
self.handle_key_share_without_session(device, event.clone().into());
Ok(None)
}
Err(e) => Err(e),
}
} else {
info!(
user_id = ?device.user_id(),
device_id = ?device.device_id(),
?secret_name,
"Received a secret request that we won't serve, the device isn't trusted",
);
Ok(None)
}
} else {
info!(
user_id = ?device.user_id(),
device_id = ?device.device_id(),
?secret_name,
"Received a secret request that we won't serve, the device doesn't belong to us",
);
Ok(None)
}
} else {
warn!(
user_id = ?event.sender,
device_id = ?event.content.requesting_device_id,
?secret_name,
"Received a secret request from an unknown device",
);
self.inner
.identity_manager
.key_query_manager
.synced(cache)
.await?
.mark_user_as_changed(&event.sender)
.await?;
Ok(None)
}
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn try_to_forward_room_key(
&self,
event: &RoomKeyRequestEvent,
device: Device,
session: &InboundGroupSession,
message_index: Option<u32>,
) -> OlmResult<Option<Session>> {
info!(?message_index, "Serving a room key request",);
match self.forward_room_key(session, &device, message_index).await {
Ok(s) => Ok(Some(s)),
Err(OlmError::MissingSession) => {
info!(
"Key request is missing an Olm session, putting the request in the wait queue",
);
self.handle_key_share_without_session(device, event.to_owned().into());
Ok(None)
}
Err(OlmError::SessionExport(e)) => {
warn!(
"Can't serve a room key request, the session \
can't be exported into a forwarded room key: {e:?}",
);
Ok(None)
}
Err(e) => Err(e),
}
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn answer_room_key_request(
&self,
cache: &StoreCache,
event: &RoomKeyRequestEvent,
session: &InboundGroupSession,
) -> OlmResult<Option<Session>> {
use super::KeyForwardDecision;
let device =
self.inner.store.get_device(&event.sender, &event.content.requesting_device_id).await?;
let Some(device) = device else {
warn!("Received a key request from an unknown device");
self.identity_manager()
.key_query_manager
.synced(cache)
.await?
.mark_user_as_changed(&event.sender)
.await?;
return Ok(None);
};
match self.should_share_key(&device, session).await {
Ok(message_index) => {
self.try_to_forward_room_key(event, device, session, message_index).await
}
Err(e) => {
if let KeyForwardDecision::ChangedSenderKey = e {
warn!(
"Received a key request from a device that changed \
their Curve25519 sender key"
);
} else {
debug!(
reason = ?e,
"Received a key request that we won't serve",
);
}
Ok(None)
}
}
}
#[cfg(feature = "automatic-room-key-forwarding")]
#[tracing::instrument(
skip_all,
fields(
user_id = ?event.sender,
device_id = ?event.content.requesting_device_id,
?room_id,
session_id
)
)]
async fn handle_supported_key_request(
&self,
cache: &StoreCache,
event: &RoomKeyRequestEvent,
room_id: &RoomId,
session_id: &str,
) -> OlmResult<Option<Session>> {
let session = self.inner.store.get_inbound_group_session(room_id, session_id).await?;
if let Some(s) = session {
self.answer_room_key_request(cache, event, &s).await
} else {
debug!("Received a room key request for an unknown inbound group session",);
Ok(None)
}
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn handle_key_request(
&self,
cache: &StoreCache,
event: &RoomKeyRequestEvent,
) -> OlmResult<Option<Session>> {
use crate::types::events::room_key_request::{Action, RequestedKeyInfo};
if self.inner.room_key_forwarding_enabled.load(Ordering::SeqCst) {
match &event.content.action {
Action::Request(info) => match info {
RequestedKeyInfo::MegolmV1AesSha2(i) => {
self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
.await
}
#[cfg(feature = "experimental-algorithms")]
RequestedKeyInfo::MegolmV2AesSha2(i) => {
self.handle_supported_key_request(cache, event, &i.room_id, &i.session_id)
.await
}
RequestedKeyInfo::Unknown(i) => {
debug!(
sender = ?event.sender,
algorithm = ?i.algorithm,
"Received a room key request for a unsupported algorithm"
);
Ok(None)
}
},
Action::Cancellation => Ok(None),
}
} else {
debug!(
sender = ?event.sender,
"Received a room key request, but room key forwarding has been turned off"
);
Ok(None)
}
}
async fn share_secret(
&self,
device: &Device,
content: SecretSendContent,
) -> OlmResult<Session> {
let event_type = content.event_type();
let (used_session, content) = device.encrypt(event_type, content).await?;
let request = ToDeviceRequest::new(
device.user_id(),
device.device_id().to_owned(),
content.event_type(),
content.cast(),
);
let request = OutgoingRequest {
request_id: request.txn_id.clone(),
request: Arc::new(request.into()),
};
self.inner.outgoing_requests.write().unwrap().insert(request.request_id.clone(), request);
Ok(used_session)
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn forward_room_key(
&self,
session: &InboundGroupSession,
device: &Device,
message_index: Option<u32>,
) -> OlmResult<Session> {
let (used_session, content) =
device.encrypt_room_key_for_forwarding(session.clone(), message_index).await?;
let request = ToDeviceRequest::new(
device.user_id(),
device.device_id().to_owned(),
content.event_type(),
content.cast(),
);
let request = OutgoingRequest {
request_id: request.txn_id.clone(),
request: Arc::new(request.into()),
};
self.inner.outgoing_requests.write().unwrap().insert(request.request_id.clone(), request);
Ok(used_session)
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn should_share_key(
&self,
device: &Device,
session: &InboundGroupSession,
) -> Result<Option<u32>, super::KeyForwardDecision> {
use super::KeyForwardDecision;
use crate::olm::ShareState;
let outbound_session = self
.inner
.outbound_group_sessions
.get_or_load(session.room_id())
.await
.filter(|outgoing_session| outgoing_session.session_id() == session.session_id());
if device.user_id() == self.user_id() && device.is_verified() {
Ok(None)
} else if let Some(outbound) = outbound_session {
match outbound.is_shared_with(&device.inner) {
ShareState::Shared { message_index, olm_wedging_index: _ } => {
Ok(Some(message_index))
}
ShareState::SharedButChangedSenderKey => Err(KeyForwardDecision::ChangedSenderKey),
ShareState::NotShared => Err(KeyForwardDecision::OutboundSessionNotShared),
}
} else if device.user_id() == self.user_id() {
Err(KeyForwardDecision::UntrustedDevice)
} else {
Err(KeyForwardDecision::MissingOutboundSession)
}
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn should_request_key(&self, key_info: &SecretInfo) -> Result<bool, CryptoStoreError> {
if self.inner.room_key_requests_enabled.load(Ordering::SeqCst) {
let request = self.inner.store.get_secret_request_by_info(key_info).await?;
if request.is_none() {
let devices = self.inner.store.get_user_devices(self.user_id()).await?;
Ok(devices.is_any_verified())
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub async fn request_key(
&self,
room_id: &RoomId,
event: &EncryptedEvent,
) -> Result<(Option<OutgoingRequest>, OutgoingRequest), MegolmError> {
let secret_info =
event.room_key_info(room_id).ok_or(EventError::UnsupportedAlgorithm)?.into();
let request = self.inner.store.get_secret_request_by_info(&secret_info).await?;
if let Some(request) = request {
let cancel = request.to_cancellation(self.device_id());
let request = request.to_request(self.device_id());
Ok((Some(cancel), request))
} else {
let request = self.request_key_helper(secret_info).await?;
Ok((None, request))
}
}
pub fn request_missing_secrets(
own_user_id: &UserId,
secret_names: Vec<SecretName>,
) -> Vec<GossipRequest> {
if !secret_names.is_empty() {
info!(?secret_names, "Creating new outgoing secret requests");
secret_names
.into_iter()
.map(|n| GossipRequest::from_secret_name(own_user_id.to_owned(), n))
.collect()
} else {
trace!("No secrets are missing from our store, not requesting them");
vec![]
}
}
async fn request_key_helper(
&self,
key_info: SecretInfo,
) -> Result<OutgoingRequest, CryptoStoreError> {
let request = GossipRequest {
request_recipient: self.user_id().to_owned(),
request_id: TransactionId::new(),
info: key_info,
sent_out: false,
};
let outgoing_request = request.to_request(self.device_id());
self.save_outgoing_key_info(request).await?;
Ok(outgoing_request)
}
#[cfg(feature = "automatic-room-key-forwarding")]
pub async fn create_outgoing_key_request(
&self,
room_id: &RoomId,
event: &EncryptedEvent,
) -> Result<bool, CryptoStoreError> {
if let Some(info) = event.room_key_info(room_id).map(|i| i.into()) {
if self.should_request_key(&info).await? {
Box::pin(self.request_key_helper(info)).await?;
return Ok(true);
}
}
Ok(false)
}
async fn save_outgoing_key_info(&self, info: GossipRequest) -> Result<(), CryptoStoreError> {
let mut changes = Changes::default();
changes.key_requests.push(info);
self.inner.store.save_changes(changes).await?;
Ok(())
}
async fn delete_key_info(&self, info: &GossipRequest) -> Result<(), CryptoStoreError> {
self.inner.store.delete_outgoing_secret_requests(&info.request_id).await
}
pub async fn mark_outgoing_request_as_sent(
&self,
id: &TransactionId,
) -> Result<(), CryptoStoreError> {
let info = self.inner.store.get_outgoing_secret_requests(id).await?;
if let Some(mut info) = info {
trace!(
recipient = ?info.request_recipient,
request_type = info.request_type(),
request_id = ?info.request_id,
"Marking outgoing secret request as sent"
);
info.sent_out = true;
self.save_outgoing_key_info(info).await?;
}
self.inner.outgoing_requests.write().unwrap().remove(id);
Ok(())
}
async fn mark_as_done(&self, key_info: &GossipRequest) -> Result<(), CryptoStoreError> {
trace!(
recipient = ?key_info.request_recipient,
request_type = key_info.request_type(),
request_id = ?key_info.request_id,
"Successfully received a secret, removing the request"
);
self.inner.outgoing_requests.write().unwrap().remove(&key_info.request_id);
self.delete_key_info(key_info).await?;
let request = key_info.to_cancellation(self.device_id());
self.inner.outgoing_requests.write().unwrap().insert(request.request_id.clone(), request);
Ok(())
}
async fn accept_secret(
&self,
secret: GossippedSecret,
changes: &mut Changes,
) -> Result<(), CryptoStoreError> {
if secret.secret_name != SecretName::RecoveryKey {
match self.inner.store.import_secret(&secret).await {
Ok(_) => self.mark_as_done(&secret.gossip_request).await?,
Err(SecretImportError::Store(e)) => return Err(e),
Err(e) => {
warn!(
secret_name = ?secret.secret_name,
error = ?e,
"Error while importing a secret"
);
}
}
} else {
info!("Received a backup decryption key, storing it into the secret inbox.");
changes.secrets.push(secret);
}
Ok(())
}
async fn receive_secret(
&self,
cache: &StoreCache,
sender_key: Curve25519PublicKey,
secret: GossippedSecret,
changes: &mut Changes,
) -> Result<(), CryptoStoreError> {
debug!("Received a m.secret.send event with a matching request");
if let Some(device) =
self.inner.store.get_device_from_curve_key(&secret.event.sender, sender_key).await?
{
if device.user_id() == self.user_id() && device.is_verified() {
self.accept_secret(secret, changes).await?;
} else {
warn!("Received a m.secret.send event from another user or from unverified device");
}
} else {
warn!("Received a m.secret.send event from an unknown device");
self.identity_manager()
.key_query_manager
.synced(cache)
.await?
.mark_user_as_changed(&secret.event.sender)
.await?;
}
Ok(())
}
#[instrument(skip_all, fields(sender_key, sender = ?event.sender, request_id = ?event.content.request_id, secret_name))]
pub async fn receive_secret_event(
&self,
cache: &StoreCache,
sender_key: Curve25519PublicKey,
event: &DecryptedSecretSendEvent,
changes: &mut Changes,
) -> Result<Option<SecretName>, CryptoStoreError> {
debug!("Received a m.secret.send event");
let request_id = &event.content.request_id;
let name = if let Some(request) =
self.inner.store.get_outgoing_secret_requests(request_id).await?
{
match &request.info {
SecretInfo::KeyRequest(_) => {
warn!("Received a m.secret.send event but the request was for a room key");
None
}
SecretInfo::SecretRequest(secret_name) => {
Span::current().record("secret_name", debug(secret_name));
let secret_name = secret_name.to_owned();
let secret = GossippedSecret {
secret_name: secret_name.to_owned(),
event: event.to_owned(),
gossip_request: request,
};
self.receive_secret(cache, sender_key, secret, changes).await?;
Some(secret_name)
}
}
} else {
warn!("Received a m.secret.send event, but no matching request was found");
None
};
Ok(name)
}
async fn accept_forwarded_room_key(
&self,
info: &GossipRequest,
sender_key: Curve25519PublicKey,
event: &DecryptedForwardedRoomKeyEvent,
) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
match InboundGroupSession::try_from(event) {
Ok(session) => {
if self.inner.store.compare_group_session(&session).await?
== SessionOrdering::Better
{
self.mark_as_done(info).await?;
info!(
?sender_key,
claimed_sender_key = ?session.sender_key(),
room_id = ?session.room_id(),
session_id = session.session_id(),
algorithm = ?session.algorithm(),
"Received a forwarded room key",
);
Ok(Some(session))
} else {
info!(
?sender_key,
claimed_sender_key = ?session.sender_key(),
room_id = ?session.room_id(),
session_id = session.session_id(),
algorithm = ?session.algorithm(),
"Received a forwarded room key but we already have a better version of it",
);
Ok(None)
}
}
Err(e) => {
warn!(?sender_key, "Couldn't create a group session from a received room key");
Err(e.into())
}
}
}
async fn should_accept_forward(
&self,
info: &GossipRequest,
sender_key: Curve25519PublicKey,
) -> Result<bool, CryptoStoreError> {
let device =
self.inner.store.get_device_from_curve_key(&info.request_recipient, sender_key).await?;
if let Some(device) = device {
Ok(device.user_id() == self.user_id() && device.is_verified())
} else {
Ok(false)
}
}
async fn receive_supported_keys(
&self,
sender_key: Curve25519PublicKey,
event: &DecryptedForwardedRoomKeyEvent,
) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
let Some(info) = event.room_key_info() else {
warn!(
sender_key = sender_key.to_base64(),
algorithm = ?event.content.algorithm(),
"Received a forwarded room key with an unsupported algorithm",
);
return Ok(None);
};
let Some(request) =
self.inner.store.get_secret_request_by_info(&info.clone().into()).await?
else {
warn!(
sender_key = ?sender_key,
room_id = ?info.room_id(),
session_id = info.session_id(),
sender_key = ?sender_key,
algorithm = ?info.algorithm(),
"Received a forwarded room key that we didn't request",
);
return Ok(None);
};
if self.should_accept_forward(&request, sender_key).await? {
self.accept_forwarded_room_key(&request, sender_key, event).await
} else {
warn!(
?sender_key,
room_id = ?info.room_id(),
session_id = info.session_id(),
"Received a forwarded room key from an unknown device, or \
from a device that the key request recipient doesn't own",
);
Ok(None)
}
}
pub async fn receive_forwarded_room_key(
&self,
sender_key: Curve25519PublicKey,
event: &DecryptedForwardedRoomKeyEvent,
) -> Result<Option<InboundGroupSession>, CryptoStoreError> {
match event.content {
ForwardedRoomKeyContent::MegolmV1AesSha2(_) => {
self.receive_supported_keys(sender_key, event).await
}
#[cfg(feature = "experimental-algorithms")]
ForwardedRoomKeyContent::MegolmV2AesSha2(_) => {
self.receive_supported_keys(sender_key, event).await
}
ForwardedRoomKeyContent::Unknown(_) => {
warn!(
sender = event.sender.as_str(),
sender_key = sender_key.to_base64(),
algorithm = ?event.content.algorithm(),
"Received a forwarded room key with an unsupported algorithm",
);
Ok(None)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
#[cfg(feature = "automatic-room-key-forwarding")]
use assert_matches::assert_matches;
use matrix_sdk_test::{async_test, message_like_event_content};
use ruma::{
device_id, event_id,
events::{
secret::request::{RequestAction, SecretName, ToDeviceSecretRequestEventContent},
ToDeviceEvent as RumaToDeviceEvent,
},
room_id,
serde::Raw,
user_id, DeviceId, RoomId, UserId,
};
use tokio::sync::Mutex;
use super::GossipMachine;
#[cfg(feature = "automatic-room-key-forwarding")]
use crate::{
gossiping::KeyForwardDecision,
olm::OutboundGroupSession,
types::requests::AnyOutgoingRequest,
types::{
events::{
forwarded_room_key::ForwardedRoomKeyContent, olm_v1::AnyDecryptedOlmEvent,
olm_v1::DecryptedOlmV1Event,
},
EventEncryptionAlgorithm,
},
EncryptionSettings,
};
use crate::{
identities::{DeviceData, IdentityManager, LocalTrust},
olm::{Account, PrivateCrossSigningIdentity},
session_manager::GroupSessionCache,
store::{Changes, CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
types::events::room::encrypted::{
EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
},
verification::VerificationMachine,
};
fn alice_id() -> &'static UserId {
user_id!("@alice:example.org")
}
fn alice_device_id() -> &'static DeviceId {
device_id!("JLAFKJWSCS")
}
fn bob_id() -> &'static UserId {
user_id!("@bob:example.org")
}
fn bob_device_id() -> &'static DeviceId {
device_id!("ILMLKASTES")
}
fn alice2_device_id() -> &'static DeviceId {
device_id!("ILMLKASTES")
}
fn room_id() -> &'static RoomId {
room_id!("!test:example.org")
}
fn account() -> Account {
Account::with_device_id(alice_id(), alice_device_id())
}
fn bob_account() -> Account {
Account::with_device_id(bob_id(), bob_device_id())
}
fn alice_2_account() -> Account {
Account::with_device_id(alice_id(), alice2_device_id())
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn gossip_machine_test_helper(user_id: &UserId) -> GossipMachine {
let user_id = user_id.to_owned();
let device_id = DeviceId::new();
let account = Account::with_device_id(&user_id, &device_id);
let store = Arc::new(CryptoStoreWrapper::new(&user_id, &device_id, MemoryStore::new()));
let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
let verification =
VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
let store = Store::new(account.static_data().clone(), identity, store, verification);
store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
let session_cache = GroupSessionCache::new(store.clone());
let identity_manager = IdentityManager::new(store.clone());
GossipMachine::new(store, identity_manager, session_cache, Default::default())
}
async fn get_machine_test_helper() -> GossipMachine {
let user_id = alice_id().to_owned();
let account = Account::with_device_id(&user_id, alice_device_id());
let device = DeviceData::from_account(&account);
let another_device =
DeviceData::from_account(&Account::with_device_id(&user_id, alice2_device_id()));
let store =
Arc::new(CryptoStoreWrapper::new(&user_id, account.device_id(), MemoryStore::new()));
let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(alice_id())));
let verification =
VerificationMachine::new(account.static_data.clone(), identity.clone(), store.clone());
let store = Store::new(account.static_data().clone(), identity, store, verification);
store.save_device_data(&[device, another_device]).await.unwrap();
store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
let session_cache = GroupSessionCache::new(store.clone());
let identity_manager = IdentityManager::new(store.clone());
GossipMachine::new(store, identity_manager, session_cache, Default::default())
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn machines_for_key_share_test_helper(
other_machine_owner: &UserId,
create_sessions: bool,
algorithm: EventEncryptionAlgorithm,
) -> (GossipMachine, OutboundGroupSession, GossipMachine) {
use crate::olm::SenderData;
let alice_machine = get_machine_test_helper().await;
let alice_device = DeviceData::from_account(
&alice_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
);
let bob_machine = gossip_machine_test_helper(other_machine_owner).await;
let bob_device = DeviceData::from_account(
#[allow(clippy::explicit_auto_deref)] &*bob_machine.inner.store.cache().await.unwrap().account().await.unwrap(),
);
let second_device = DeviceData::from_account(&alice_2_account());
second_device.set_trust_state(LocalTrust::Verified);
bob_device.set_trust_state(LocalTrust::Verified);
alice_machine.inner.store.save_device_data(&[bob_device, second_device]).await.unwrap();
bob_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
if create_sessions {
let (alice_session, bob_session) = alice_machine
.inner
.store
.with_transaction(|mut atr| async {
let sessions = bob_machine
.inner
.store
.with_transaction(|mut btr| async {
let alice_account = atr.account().await?;
let bob_account = btr.account().await?;
let sessions =
alice_account.create_session_for_test_helper(bob_account).await;
Ok((btr, sessions))
})
.await?;
Ok((atr, sessions))
})
.await
.unwrap();
alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
}
let settings = EncryptionSettings { algorithm, ..Default::default() };
let (group_session, inbound_group_session) = bob_machine
.inner
.store
.static_account()
.create_group_session_pair(room_id(), settings, SenderData::unknown())
.await
.unwrap();
bob_machine
.inner
.store
.save_inbound_group_sessions(&[inbound_group_session])
.await
.unwrap();
let content = group_session.encrypt("m.dummy", &message_like_event_content!({})).await;
let event = wrap_encrypted_content(bob_machine.user_id(), content);
assert!(
alice_machine.create_outgoing_key_request(room_id(), &event,).await.unwrap(),
"We should request a room key"
);
group_session
.mark_shared_with(
alice_device.user_id(),
alice_device.device_id(),
alice_device.curve25519_key().unwrap(),
)
.await;
bob_machine.inner.outbound_group_sessions.insert(group_session.clone());
(alice_machine, group_session, bob_machine)
}
fn extract_content<'a>(
recipient: &UserId,
request: &'a crate::types::requests::OutgoingRequest,
) -> &'a Raw<ruma::events::AnyToDeviceEventContent> {
request
.request()
.to_device()
.expect("The request should be always a to-device request")
.messages
.get(recipient)
.unwrap()
.values()
.next()
.unwrap()
}
fn wrap_encrypted_content(
sender: &UserId,
content: Raw<RoomEncryptedEventContent>,
) -> EncryptedEvent {
let content = content.deserialize().unwrap();
EncryptedEvent {
sender: sender.to_owned(),
event_id: event_id!("$143273582443PhrSn:example.org").to_owned(),
content,
origin_server_ts: ruma::MilliSecondsSinceUnixEpoch::now(),
unsigned: Default::default(),
other: Default::default(),
}
}
fn request_to_event<C>(
recipient: &UserId,
sender: &UserId,
request: &crate::types::requests::OutgoingRequest,
) -> crate::types::events::ToDeviceEvent<C>
where
C: crate::types::events::EventType
+ serde::de::DeserializeOwned
+ serde::ser::Serialize
+ std::fmt::Debug,
{
let content = extract_content(recipient, request);
let content: C = content.deserialize_as().unwrap_or_else(|_| {
panic!("We can always deserialize the to-device event content {content:?}")
});
crate::types::events::ToDeviceEvent::new(sender.to_owned(), content)
}
#[async_test]
async fn test_create_machine() {
let machine = get_machine_test_helper().await;
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
}
#[async_test]
async fn test_re_request_keys() {
let machine = get_machine_test_helper().await;
let account = account();
let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
let event = wrap_encrypted_content(machine.user_id(), content);
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
let (cancel, request) = machine.request_key(session.room_id(), &event).await.unwrap();
assert!(cancel.is_none());
machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
let (cancel, _) = machine.request_key(session.room_id(), &event).await.unwrap();
assert!(cancel.is_some());
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_create_key_request() {
let machine = get_machine_test_helper().await;
let account = account();
let second_account = alice_2_account();
let alice_device = DeviceData::from_account(&second_account);
alice_device.set_trust_state(LocalTrust::Verified);
machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
let event = wrap_encrypted_content(machine.user_id(), content);
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
assert!(!machine.outgoing_to_device_requests().await.unwrap().is_empty());
assert_eq!(machine.outgoing_to_device_requests().await.unwrap().len(), 1);
machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
let requests = machine.outgoing_to_device_requests().await.unwrap();
assert_eq!(requests.len(), 1);
let request = &requests[0];
machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_create_key_request_requests_disabled() {
let machine = get_machine_test_helper().await;
let account = account();
let second_account = alice_2_account();
let alice_device = DeviceData::from_account(&second_account);
alice_device.set_trust_state(LocalTrust::Verified);
machine.inner.store.save_device_data(&[alice_device]).await.unwrap();
assert!(machine.are_room_key_requests_enabled());
machine.set_room_key_requests_enabled(false);
assert!(!machine.are_room_key_requests_enabled());
let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
let event = wrap_encrypted_content(machine.user_id(), content);
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
machine.create_outgoing_key_request(session.room_id(), &event).await.unwrap();
assert!(machine.outgoing_to_device_requests().await.unwrap().is_empty());
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_receive_forwarded_key() {
let machine = get_machine_test_helper().await;
let account = account();
let second_account = alice_2_account();
let alice_device = DeviceData::from_account(&second_account);
alice_device.set_trust_state(LocalTrust::Verified);
machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
let (outbound, session) = account.create_group_session_pair_with_defaults(room_id()).await;
let content = outbound.encrypt("m.dummy", &message_like_event_content!({})).await;
let room_event = wrap_encrypted_content(machine.user_id(), content);
machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
let requests = machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let id = &request.request_id;
machine.mark_outgoing_request_as_sent(id).await.unwrap();
let export = session.export_at_index(10).await;
let content: ForwardedRoomKeyContent = export.try_into().unwrap();
let event = DecryptedOlmV1Event::new(
alice_id(),
alice_id(),
alice_device.ed25519_key().unwrap(),
None,
content,
);
assert!(machine
.inner
.store
.get_inbound_group_session(session.room_id(), session.session_id(),)
.await
.unwrap()
.is_none());
let first_session = machine
.receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
.await
.unwrap();
let first_session = first_session.unwrap();
assert_eq!(first_session.first_known_index(), 10);
machine.inner.store.save_inbound_group_sessions(&[first_session.clone()]).await.unwrap();
let id = machine
.inner
.outgoing_requests
.read()
.unwrap()
.first_key_value()
.map(|(_, r)| r.request_id.clone())
.unwrap();
machine.mark_outgoing_request_as_sent(&id).await.unwrap();
machine.create_outgoing_key_request(session.room_id(), &room_event).await.unwrap();
let requests = machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
let export = session.export_at_index(15).await;
let content: ForwardedRoomKeyContent = export.try_into().unwrap();
let event = DecryptedOlmV1Event::new(
alice_id(),
alice_id(),
alice_device.ed25519_key().unwrap(),
None,
content,
);
let second_session = machine
.receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
.await
.unwrap();
assert!(second_session.is_none());
let export = session.export_at_index(0).await;
let content: ForwardedRoomKeyContent = export.try_into().unwrap();
let event = DecryptedOlmV1Event::new(
alice_id(),
alice_id(),
alice_device.ed25519_key().unwrap(),
None,
content,
);
let second_session = machine
.receive_forwarded_room_key(alice_device.curve25519_key().unwrap(), &event)
.await
.unwrap();
assert_eq!(second_session.unwrap().first_known_index(), 0);
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_should_share_key() {
let machine = get_machine_test_helper().await;
let account = account();
let own_device =
machine.inner.store.get_device(alice_id(), alice2_device_id()).await.unwrap().unwrap();
let (outbound, inbound) = account.create_group_session_pair_with_defaults(room_id()).await;
assert_matches!(
machine.should_share_key(&own_device, &inbound).await,
Err(KeyForwardDecision::UntrustedDevice)
);
own_device.set_trust_state(LocalTrust::Verified);
machine.should_share_key(&own_device, &inbound).await.unwrap();
let bob_device = DeviceData::from_account(&bob_account());
machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
let bob_device =
machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
assert_matches!(
machine.should_share_key(&bob_device, &inbound).await,
Err(KeyForwardDecision::MissingOutboundSession)
);
let mut changes = Changes::default();
changes.outbound_group_sessions.push(outbound.clone());
changes.inbound_group_sessions.push(inbound.clone());
machine.inner.store.save_changes(changes).await.unwrap();
machine.inner.outbound_group_sessions.insert(outbound.clone());
assert_matches!(
machine.should_share_key(&bob_device, &inbound).await,
Err(KeyForwardDecision::OutboundSessionNotShared)
);
bob_device.set_trust_state(LocalTrust::Verified);
assert_matches!(
machine.should_share_key(&bob_device, &inbound).await,
Err(KeyForwardDecision::OutboundSessionNotShared)
);
outbound
.mark_shared_with(
bob_device.user_id(),
bob_device.device_id(),
bob_device.curve25519_key().unwrap(),
)
.await;
machine.should_share_key(&bob_device, &inbound).await.unwrap();
let (other_outbound, other_inbound) =
account.create_group_session_pair_with_defaults(room_id()).await;
assert_matches!(
machine.should_share_key(&bob_device, &other_inbound).await,
Err(KeyForwardDecision::MissingOutboundSession)
);
let bob_device = DeviceData::from_account(&bob_account());
machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
let bob_device =
machine.inner.store.get_device(bob_id(), bob_device_id()).await.unwrap().unwrap();
assert_matches!(
machine.should_share_key(&bob_device, &inbound).await,
Err(KeyForwardDecision::ChangedSenderKey)
);
own_device.set_trust_state(LocalTrust::Unset);
for _ in 1..=3 {
other_outbound.encrypt_helper("foo".to_owned()).await;
}
other_outbound
.mark_shared_with(
own_device.user_id(),
own_device.device_id(),
own_device.curve25519_key().unwrap(),
)
.await;
machine.inner.outbound_group_sessions.insert(other_outbound.clone());
assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(Some(3)));
own_device.set_trust_state(LocalTrust::Verified);
assert_matches!(machine.should_share_key(&own_device, &other_inbound).await, Ok(None));
}
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_key_share_cycle(algorithm: EventEncryptionAlgorithm) {
let (alice_machine, group_session, bob_machine) =
machines_for_key_share_test_helper(alice_id(), true, algorithm).await;
let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event = request_to_event(alice_id(), alice_id(), request);
alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(bob_machine.inner.outgoing_requests.read().unwrap().is_empty());
bob_machine.receive_incoming_key_request(&event);
{
let bob_cache = bob_machine.inner.store.cache().await.unwrap();
bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
}
assert!(!bob_machine.inner.outgoing_requests.read().unwrap().is_empty());
let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(alice_machine
.inner
.store
.get_inbound_group_session(room_id(), group_session.session_id())
.await
.unwrap()
.is_none());
let decrypted = alice_machine
.inner
.store
.with_transaction(|mut tr| async {
let res = tr
.account()
.await?
.decrypt_to_device_event(&alice_machine.inner.store, &event)
.await?;
Ok((tr, res))
})
.await
.unwrap();
let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
panic!("Invalid decrypted event type");
};
let session = alice_machine
.receive_forwarded_room_key(decrypted.result.sender_key, ev)
.await
.unwrap();
alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
let session = alice_machine
.inner
.store
.get_inbound_group_session(room_id(), group_session.session_id())
.await
.unwrap()
.unwrap();
assert_eq!(session.session_id(), group_session.session_id())
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_reject_forward_from_another_user() {
let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
bob_id(),
true,
EventEncryptionAlgorithm::MegolmV1AesSha2,
)
.await;
let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event = request_to_event(alice_id(), alice_id(), request);
alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(bob_machine.inner.outgoing_requests.read().unwrap().is_empty());
bob_machine.receive_incoming_key_request(&event);
{
let bob_cache = bob_machine.inner.store.cache().await.unwrap();
bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
}
assert!(!bob_machine.inner.outgoing_requests.read().unwrap().is_empty());
let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event: EncryptedToDeviceEvent = request_to_event(alice_id(), bob_id(), request);
bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(alice_machine
.inner
.store
.get_inbound_group_session(room_id(), group_session.session_id())
.await
.unwrap()
.is_none());
let decrypted = alice_machine
.inner
.store
.with_transaction(|mut tr| async {
let res = tr
.account()
.await?
.decrypt_to_device_event(&alice_machine.inner.store, &event)
.await?;
Ok((tr, res))
})
.await
.unwrap();
let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
panic!("Invalid decrypted event type");
};
let session = alice_machine
.receive_forwarded_room_key(decrypted.result.sender_key, ev)
.await
.unwrap();
assert!(session.is_none(), "We should not receive a room key from another user");
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_key_share_cycle_megolm_v1() {
test_key_share_cycle(EventEncryptionAlgorithm::MegolmV1AesSha2).await;
}
#[async_test]
#[cfg(all(feature = "experimental-algorithms", feature = "automatic-room-key-forwarding"))]
async fn test_key_share_cycle_megolm_v2() {
test_key_share_cycle(EventEncryptionAlgorithm::MegolmV2AesSha2).await;
}
#[async_test]
async fn test_secret_share_cycle() {
let alice_machine = get_machine_test_helper().await;
let mut second_account = alice_2_account();
let alice_device = DeviceData::from_account(&second_account);
let bob_account = bob_account();
let bob_device = DeviceData::from_account(&bob_account);
alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
let alice_session = alice_machine
.inner
.store
.with_transaction(|mut tr| async {
let alice_account = tr.account().await?;
let (alice_session, _) =
alice_account.create_session_for_test_helper(&mut second_account).await;
Ok((tr, alice_session))
})
.await
.unwrap();
alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
let event = RumaToDeviceEvent {
sender: bob_account.user_id().to_owned(),
content: ToDeviceSecretRequestEventContent::new(
RequestAction::Request(SecretName::CrossSigningMasterKey),
bob_account.device_id().to_owned(),
"request_id".into(),
),
};
assert!(alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
alice_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.inner.store.cache().await.unwrap();
alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
}
assert!(alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
alice_machine.inner.store.reset_cross_signing_identity().await;
alice_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.inner.store.cache().await.unwrap();
alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
}
assert!(alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
alice_machine.inner.store.save_device_data(&[bob_device]).await.unwrap();
alice_machine.inner.store.reset_cross_signing_identity().await;
alice_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.inner.store.cache().await.unwrap();
alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
}
assert!(alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
let event = RumaToDeviceEvent {
sender: alice_id().to_owned(),
content: ToDeviceSecretRequestEventContent::new(
RequestAction::Request(SecretName::CrossSigningMasterKey),
second_account.device_id().into(),
"request_id".into(),
),
};
alice_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.inner.store.cache().await.unwrap();
alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
}
assert!(alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
alice_device.set_trust_state(LocalTrust::Verified);
alice_machine.inner.store.save_device_data(&[alice_device.clone()]).await.unwrap();
alice_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.inner.store.cache().await.unwrap();
alice_machine.collect_incoming_key_requests(&alice_cache).await.unwrap();
}
assert!(!alice_machine.inner.outgoing_requests.read().unwrap().is_empty());
}
#[async_test]
async fn test_secret_broadcasting() {
use futures_util::{pin_mut, FutureExt};
use ruma::api::client::to_device::send_event_to_device::v3::Response as ToDeviceResponse;
use serde_json::value::to_raw_value;
use tokio_stream::StreamExt;
use crate::{
machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper,
EncryptionSyncChanges,
};
let alice_id = user_id!("@alice:localhost");
let (alice_machine, bob_machine) =
get_machine_pair_with_setup_sessions_test_helper(alice_id, alice_id, false).await;
let key_requests = GossipMachine::request_missing_secrets(
bob_machine.user_id(),
vec![SecretName::RecoveryKey],
);
let mut changes = Changes::default();
let request_id = key_requests[0].request_id.to_owned();
changes.key_requests = key_requests;
bob_machine.store().save_changes(changes).await.unwrap();
for request in bob_machine.outgoing_requests().await.unwrap() {
bob_machine
.mark_request_as_sent(request.request_id(), &ToDeviceResponse::new())
.await
.unwrap();
}
let event = RumaToDeviceEvent {
sender: alice_machine.user_id().to_owned(),
content: ToDeviceSecretRequestEventContent::new(
RequestAction::Request(SecretName::RecoveryKey),
bob_machine.device_id().to_owned(),
request_id,
),
};
let bob_device = alice_machine
.get_device(alice_id, bob_machine.device_id(), None)
.await
.unwrap()
.unwrap();
let alice_device = bob_machine
.get_device(alice_id, alice_machine.device_id(), None)
.await
.unwrap()
.unwrap();
bob_device.set_trust_state(LocalTrust::Verified);
alice_device.set_trust_state(LocalTrust::Verified);
alice_machine.store().save_device_data(&[bob_device.inner]).await.unwrap();
bob_machine.store().save_device_data(&[alice_device.inner]).await.unwrap();
let decryption_key = crate::store::BackupDecryptionKey::new().unwrap();
alice_machine
.backup_machine()
.save_decryption_key(Some(decryption_key), None)
.await
.unwrap();
alice_machine.inner.key_request_machine.receive_incoming_secret_request(&event);
{
let alice_cache = alice_machine.store().cache().await.unwrap();
alice_machine
.inner
.key_request_machine
.collect_incoming_key_requests(&alice_cache)
.await
.unwrap();
}
let requests =
alice_machine.inner.key_request_machine.outgoing_to_device_requests().await.unwrap();
assert_eq!(requests.len(), 1);
let request = requests.first().expect("We should have an outgoing to-device request");
let event: EncryptedToDeviceEvent =
request_to_event(bob_machine.user_id(), alice_machine.user_id(), request);
let event = Raw::from_json(to_raw_value(&event).unwrap());
let stream = bob_machine.store().secrets_stream();
pin_mut!(stream);
bob_machine
.receive_sync_changes(EncryptionSyncChanges {
to_device_events: vec![event],
changed_devices: &Default::default(),
one_time_keys_counts: &Default::default(),
unused_fallback_keys: None,
next_batch_token: None,
})
.await
.unwrap();
stream.next().now_or_never().expect("The broadcaster should have sent out the secret");
}
#[async_test]
#[cfg(feature = "automatic-room-key-forwarding")]
async fn test_key_share_cycle_without_session() {
let (alice_machine, group_session, bob_machine) = machines_for_key_share_test_helper(
alice_id(),
false,
EventEncryptionAlgorithm::MegolmV1AesSha2,
)
.await;
let requests = alice_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event = request_to_event(alice_id(), alice_id(), request);
alice_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
assert!(bob_machine.inner.users_for_key_claim.read().unwrap().is_empty());
assert!(bob_machine.inner.wait_queue.is_empty());
bob_machine.receive_incoming_key_request(&event);
{
let bob_cache = bob_machine.inner.store.cache().await.unwrap();
bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
}
assert_eq!(bob_machine.outgoing_to_device_requests().await.unwrap().len(), 1);
assert_matches!(
bob_machine.outgoing_to_device_requests().await.unwrap()[0].request(),
AnyOutgoingRequest::KeysClaim(_)
);
assert!(!bob_machine.inner.users_for_key_claim.read().unwrap().is_empty());
assert!(!bob_machine.inner.wait_queue.is_empty());
let (alice_session, bob_session) = alice_machine
.inner
.store
.with_transaction(|mut atr| async {
let res = bob_machine
.inner
.store
.with_transaction(|mut btr| async {
let alice_account = atr.account().await?;
let bob_account = btr.account().await?;
let sessions =
alice_account.create_session_for_test_helper(bob_account).await;
Ok((btr, sessions))
})
.await?;
Ok((atr, res))
})
.await
.unwrap();
alice_machine.inner.store.save_sessions(&[alice_session]).await.unwrap();
bob_machine.inner.store.save_sessions(&[bob_session]).await.unwrap();
bob_machine.retry_keyshare(alice_id(), alice_device_id());
assert!(bob_machine.inner.users_for_key_claim.read().unwrap().is_empty());
{
let bob_cache = bob_machine.inner.store.cache().await.unwrap();
bob_machine.collect_incoming_key_requests(&bob_cache).await.unwrap();
}
assert!(!bob_machine.outgoing_to_device_requests().await.unwrap().is_empty());
assert!(bob_machine.inner.wait_queue.is_empty());
let requests = bob_machine.outgoing_to_device_requests().await.unwrap();
let request = &requests[0];
let event: EncryptedToDeviceEvent = request_to_event(alice_id(), alice_id(), request);
bob_machine.mark_outgoing_request_as_sent(&request.request_id).await.unwrap();
assert!(alice_machine
.inner
.store
.get_inbound_group_session(room_id(), group_session.session_id())
.await
.unwrap()
.is_none());
let decrypted = alice_machine
.inner
.store
.with_transaction(|mut tr| async {
let res = tr
.account()
.await?
.decrypt_to_device_event(&alice_machine.inner.store, &event)
.await?;
Ok((tr, res))
})
.await
.unwrap();
let AnyDecryptedOlmEvent::ForwardedRoomKey(ev) = &*decrypted.result.event else {
panic!("Invalid decrypted event type");
};
let session = alice_machine
.receive_forwarded_room_key(decrypted.result.sender_key, ev)
.await
.unwrap();
alice_machine.inner.store.save_inbound_group_sessions(&[session.unwrap()]).await.unwrap();
let session = alice_machine
.inner
.store
.get_inbound_group_session(room_id(), group_session.session_id())
.await
.unwrap()
.unwrap();
assert_eq!(session.session_id(), group_session.session_id())
}
}