use eyeball_im::{ObservableVector, VectorDiff};
use futures_core::Stream;
use imbl::Vector;
use ruma::{
api::client::directory::get_public_rooms_filtered::v3::Request as PublicRoomsFilterRequest,
directory::{Filter, PublicRoomJoinRule},
OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId,
};
use crate::{Client, OwnedServerName, Result};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RoomDescription {
pub room_id: OwnedRoomId,
pub name: Option<String>,
pub topic: Option<String>,
pub alias: Option<OwnedRoomAliasId>,
pub avatar_url: Option<OwnedMxcUri>,
pub join_rule: PublicRoomJoinRule,
pub is_world_readable: bool,
pub joined_members: u64,
}
impl From<ruma::directory::PublicRoomsChunk> for RoomDescription {
fn from(value: ruma::directory::PublicRoomsChunk) -> Self {
Self {
room_id: value.room_id,
name: value.name,
topic: value.topic,
alias: value.canonical_alias,
avatar_url: value.avatar_url,
join_rule: value.join_rule,
is_world_readable: value.world_readable,
joined_members: value.num_joined_members.into(),
}
}
}
#[derive(Default, Debug)]
enum SearchState {
Next(String),
End,
#[default]
Start,
}
impl SearchState {
fn next_token(&self) -> Option<&str> {
if let Self::Next(next_token) = &self {
Some(next_token)
} else {
None
}
}
fn is_at_end(&self) -> bool {
matches!(self, Self::End)
}
}
#[derive(Debug)]
pub struct RoomDirectorySearch {
batch_size: u32,
filter: Option<String>,
server: Option<OwnedServerName>,
search_state: SearchState,
client: Client,
results: ObservableVector<RoomDescription>,
}
impl RoomDirectorySearch {
pub fn new(client: Client) -> Self {
Self {
batch_size: 0,
filter: None,
server: None,
search_state: Default::default(),
client,
results: ObservableVector::new(),
}
}
pub async fn search(
&mut self,
filter: Option<String>,
batch_size: u32,
via_server: Option<OwnedServerName>,
) -> Result<()> {
self.filter = filter;
self.batch_size = batch_size;
self.search_state = Default::default();
self.results.clear();
self.server = via_server;
self.next_page().await
}
pub async fn next_page(&mut self) -> Result<()> {
if self.search_state.is_at_end() {
return Ok(());
}
let mut filter = Filter::new();
filter.generic_search_term = self.filter.clone();
let mut request = PublicRoomsFilterRequest::new();
request.filter = filter;
request.server = self.server.clone();
request.limit = Some(self.batch_size.into());
request.since = self.search_state.next_token().map(ToOwned::to_owned);
let response = self.client.public_rooms_filtered(request).await?;
if let Some(next_token) = response.next_batch {
self.search_state = SearchState::Next(next_token);
} else {
self.search_state = SearchState::End;
}
self.results.append(response.chunk.into_iter().map(Into::into).collect());
Ok(())
}
pub fn results(
&self,
) -> (Vector<RoomDescription>, impl Stream<Item = Vec<VectorDiff<RoomDescription>>>) {
self.results.subscribe().into_values_and_batched_stream()
}
pub fn loaded_pages(&self) -> usize {
if self.batch_size == 0 {
return 0;
}
(self.results.len() as f64 / self.batch_size as f64).ceil() as usize
}
pub fn is_at_last_page(&self) -> bool {
self.search_state.is_at_end()
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use matrix_sdk_test::{async_test, test_json};
use ruma::{directory::Filter, owned_server_name, serde::Raw, RoomAliasId, RoomId};
use serde_json::Value as JsonValue;
use stream_assert::assert_pending;
use wiremock::{
matchers::{method, path_regex},
Match, Mock, MockServer, Request, ResponseTemplate,
};
use crate::{
room_directory_search::{RoomDescription, RoomDirectorySearch},
test_utils::logged_in_client,
Client,
};
struct RoomDirectorySearchMatcher {
next_token: Option<String>,
filter_term: Option<String>,
limit: u32,
}
impl Match for RoomDirectorySearchMatcher {
fn matches(&self, request: &Request) -> bool {
let Ok(body) = request.body_json::<Raw<JsonValue>>() else {
return false;
};
if !body.get_field::<String>("since").is_ok_and(|s| s == self.next_token) {
return false;
}
if !body.get_field::<u32>("limit").is_ok_and(|s| s == Some(self.limit)) {
return false;
}
if !body.get_field::<Filter>("filter").is_ok_and(|s| {
if self.filter_term.is_none() {
s.is_none() || s.is_some_and(|s| s.generic_search_term.is_none())
} else {
s.is_some_and(|s| s.generic_search_term == self.filter_term)
}
}) {
return false;
}
method("POST").matches(request)
&& path_regex("/_matrix/client/../publicRooms").matches(request)
}
}
fn get_first_page_description() -> RoomDescription {
RoomDescription {
room_id: RoomId::parse("!ol19s:bleecker.street").unwrap(),
name: Some("CHEESE".into()),
topic: Some("Tasty tasty cheese".into()),
alias: None,
avatar_url: Some("mxc://bleeker.street/CHEDDARandBRIE".into()),
join_rule: ruma::directory::PublicRoomJoinRule::Public,
is_world_readable: true,
joined_members: 37,
}
}
fn get_second_page_description() -> RoomDescription {
RoomDescription {
room_id: RoomId::parse("!ca18r:bleecker.street").unwrap(),
name: Some("PEAR".into()),
topic: Some("Tasty tasty pear".into()),
alias: RoomAliasId::parse("#murrays:pear.bar").ok(),
avatar_url: Some("mxc://bleeker.street/pear".into()),
join_rule: ruma::directory::PublicRoomJoinRule::Knock,
is_world_readable: false,
joined_members: 20,
}
}
async fn new_server_and_client() -> (MockServer, Client) {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
(server, client)
}
#[async_test]
async fn test_search_success() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher { next_token: None, filter_term: None, limit: 1 })
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
let via_server = owned_server_name!("some.server.org");
room_directory_search.search(None, 1, Some(via_server)).await.unwrap();
let (results, mut stream) = room_directory_search.results();
assert_pending!(stream);
assert_eq!(results.len(), 1);
assert_eq!(results[0], get_first_page_description());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
}
#[async_test]
async fn test_search_success_paginated() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher { next_token: None, filter_term: None, limit: 1 })
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
room_directory_search.search(None, 1, None).await.unwrap();
let (initial_results, mut stream) = room_directory_search.results();
assert_eq!(initial_results, vec![get_first_page_description()].into());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
Mock::given(RoomDirectorySearchMatcher {
next_token: Some("p190q".into()),
filter_term: None,
limit: 1,
})
.respond_with(
ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS_FINAL_PAGE),
)
.mount(&server)
.await;
room_directory_search.next_page().await.unwrap();
let results_batch: Vec<VectorDiff<RoomDescription>> = stream.next().await.unwrap();
assert_matches!(&results_batch[0], VectorDiff::Append { values } => { assert_eq!(values, &vec![get_second_page_description()].into()); });
assert!(room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 2);
assert_pending!(stream);
}
#[async_test]
async fn test_search_fails() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher { next_token: None, filter_term: None, limit: 1 })
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
assert!(room_directory_search.next_page().await.is_err());
let (results, mut stream) = room_directory_search.results();
assert_eq!(results.len(), 0);
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 0);
assert_pending!(stream);
}
#[async_test]
async fn test_search_fails_when_paginating() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher { next_token: None, filter_term: None, limit: 1 })
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
room_directory_search.search(None, 1, None).await.unwrap();
let (results, mut stream) = room_directory_search.results();
assert_eq!(results, vec![get_first_page_description()].into());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
assert_pending!(stream);
Mock::given(RoomDirectorySearchMatcher {
next_token: Some("p190q".into()),
filter_term: None,
limit: 1,
})
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
assert!(room_directory_search.next_page().await.is_err());
assert_eq!(results, vec![get_first_page_description()].into());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
assert_pending!(stream);
}
#[async_test]
async fn test_search_success_paginated_with_filter() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher {
next_token: None,
filter_term: Some("bleecker.street".into()),
limit: 1,
})
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
room_directory_search.search(Some("bleecker.street".into()), 1, None).await.unwrap();
let (initial_results, mut stream) = room_directory_search.results();
assert_eq!(initial_results, vec![get_first_page_description()].into());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
Mock::given(RoomDirectorySearchMatcher {
next_token: Some("p190q".into()),
filter_term: Some("bleecker.street".into()),
limit: 1,
})
.respond_with(
ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS_FINAL_PAGE),
)
.mount(&server)
.await;
room_directory_search.next_page().await.unwrap();
let results_batch: Vec<VectorDiff<RoomDescription>> = stream.next().await.unwrap();
assert_matches!(&results_batch[0], VectorDiff::Append { values } => { assert_eq!(values, &vec![get_second_page_description()].into()); });
assert!(room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 2);
assert_pending!(stream);
}
#[async_test]
async fn test_search_followed_by_another_search_with_filter() {
let (server, client) = new_server_and_client().await;
let mut room_directory_search = RoomDirectorySearch::new(client);
Mock::given(RoomDirectorySearchMatcher { next_token: None, filter_term: None, limit: 1 })
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
room_directory_search.search(None, 1, None).await.unwrap();
let (initial_results, mut stream) = room_directory_search.results();
assert_eq!(initial_results, vec![get_first_page_description()].into());
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
Mock::given(RoomDirectorySearchMatcher {
next_token: None,
filter_term: Some("bleecker.street".into()),
limit: 1,
})
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::PUBLIC_ROOMS))
.mount(&server)
.await;
room_directory_search.search(Some("bleecker.street".into()), 1, None).await.unwrap();
let results_batch: Vec<VectorDiff<RoomDescription>> = stream.next().await.unwrap();
assert_matches!(&results_batch[0], VectorDiff::Clear);
assert_matches!(&results_batch[1], VectorDiff::Append { values } => { assert_eq!(values, &vec![get_first_page_description()].into()); });
assert!(!room_directory_search.is_at_last_page());
assert_eq!(room_directory_search.loaded_pages(), 1);
assert_pending!(stream);
}
}