1#![allow(clippy::large_enum_variant)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 io::{self, Write, stdout},
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use clap::Parser;
12use color_eyre::Result;
13use crossterm::{
14 event::{
15 self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyModifiers,
16 },
17 execute,
18};
19use futures_util::{StreamExt as _, pin_mut};
20use imbl::Vector;
21use layout::Flex;
22use matrix_sdk::{
23 AuthSession, Client, Room, SqliteCryptoStore, SqliteEventCacheStore, SqliteStateStore,
24 ThreadingSupport,
25 authentication::matrix::MatrixSession,
26 config::StoreConfig,
27 deserialized_responses::TimelineEvent,
28 encryption::{BackupDownloadStrategy, EncryptionSettings},
29 reqwest::Url,
30 ruma::{
31 OwnedEventId, OwnedRoomId, api::client::room::create_room::v3::Request as CreateRoomRequest,
32 },
33 search_index::{SearchIndexGuard, SearchIndexStoreKind},
34};
35use matrix_sdk_base::{RoomStateFilter, event_cache::store::EventCacheStoreLockGuard};
36use matrix_sdk_common::locks::Mutex;
37use matrix_sdk_ui::{
38 Timeline as SdkTimeline,
39 room_list_service::{self, State, filters::new_filter_non_left},
40 sync_service::SyncService,
41 timeline::{RoomExt as _, TimelineFocus, TimelineItem},
42};
43use ratatui::{prelude::*, style::palette::tailwind, widgets::*};
44use throbber_widgets_tui::{Throbber, ThrobberState};
45use tokio::{
46 spawn,
47 sync::mpsc::{Receiver, Sender, channel, error::TryRecvError},
48 task::JoinHandle,
49 time::timeout,
50};
51use tracing::{debug, error, warn};
52use tracing_subscriber::EnvFilter;
53use widgets::{
54 recovery::create_centered_throbber_area, room_view::RoomView, settings::SettingsView,
55};
56
57use crate::widgets::{
58 create_room::CreateRoomView,
59 help::HelpView,
60 room_list::{ExtraRoomInfo, RoomInfos, RoomList, Rooms},
61 search::{
62 indexing::{IndexingMessage, IndexingView},
63 searching::SearchingView,
64 },
65 status::Status,
66};
67
68mod widgets;
69
70const HEADER_BG: Color = tailwind::BLUE.c950;
71const NORMAL_ROW_COLOR: Color = tailwind::SLATE.c950;
72const ALT_ROW_COLOR: Color = tailwind::SLATE.c900;
73const SELECTED_STYLE_FG: Color = tailwind::BLUE.c300;
74const TEXT_COLOR: Color = tailwind::SLATE.c200;
75
76type Timelines = Arc<Mutex<HashMap<OwnedRoomId, Timeline>>>;
77
78#[derive(Debug, Parser)]
79struct Cli {
80 server_name: String,
82
83 #[clap(default_value = "/tmp/")]
85 session_path: PathBuf,
86
87 #[clap(short, long, env = "PROXY")]
89 proxy: Option<Url>,
90
91 #[clap(short, long, default_value_t = false)]
96 dont_share_pos: bool,
97}
98
99#[derive(Default)]
100pub enum GlobalMode {
101 #[default]
103 Default,
104 Help,
106 Settings { view: SettingsView },
108 Exiting { shutdown_task: JoinHandle<()> },
110 CreateRoom { view: CreateRoomView },
112 Searching { view: SearchingView },
114 Indexing { view: IndexingView },
116}
117
118fn popup_area(area: Rect, percent_x: u16, percent_y: u16) -> Rect {
121 let vertical = Layout::vertical([Constraint::Percentage(percent_y)]).flex(Flex::Center);
122 let horizontal = Layout::horizontal([Constraint::Percentage(percent_x)]).flex(Flex::Center);
123 let [area] = vertical.areas(area);
124 let [area] = horizontal.areas(area);
125 area
126}
127
128#[tokio::main]
129async fn main() -> Result<()> {
130 let cli = Cli::parse();
131 let file_writer = tracing_appender::rolling::hourly(&cli.session_path, "logs-");
132
133 tracing_subscriber::fmt()
134 .with_env_filter(EnvFilter::from_default_env())
135 .with_ansi(false)
136 .with_writer(file_writer)
137 .init();
138
139 color_eyre::install()?;
140
141 let share_pos = !cli.dont_share_pos;
142 let client = configure_client(cli).await?;
143
144 let event_cache = client.event_cache();
145 event_cache.subscribe()?;
146
147 let terminal = ratatui::init();
148 execute!(stdout(), EnableMouseCapture)?;
149 let mut app = App::new(client, share_pos).await?;
150
151 app.run(terminal).await
152}
153
154pub struct Timeline {
155 timeline: Arc<SdkTimeline>,
156 items: Arc<Mutex<Vector<Arc<TimelineItem>>>>,
157 task: JoinHandle<()>,
158}
159
160#[derive(Default)]
161pub struct AppState {
162 global_mode: GlobalMode,
165
166 throbber_state: ThrobberState,
168}
169
170struct App {
171 client: Client,
173
174 sync_service: Arc<SyncService>,
176
177 timelines: Timelines,
179
180 room_list: RoomList,
182
183 room_view: RoomView,
186
187 listen_task: JoinHandle<()>,
189
190 indexing_task: JoinHandle<()>,
192
193 indexing_receiver: Receiver<(bool, IndexingMessage)>,
195
196 status: Status,
198
199 state: AppState,
200
201 last_tick: Instant,
202}
203
204impl App {
205 const TICK_RATE: Duration = Duration::from_millis(250);
206
207 async fn new(client: Client, share_pos: bool) -> Result<Self> {
208 let sync_service =
209 Arc::new(SyncService::builder(client.clone()).with_share_pos(share_pos).build().await?);
210
211 let rooms = Rooms::default();
212 let room_infos = RoomInfos::default();
213 let timelines = Timelines::default();
214
215 let room_list_service = sync_service.room_list_service();
216 let all_rooms = room_list_service.all_rooms().await?;
217
218 let listen_task = spawn(Self::listen_task(
219 rooms.clone(),
220 room_infos.clone(),
221 timelines.clone(),
222 all_rooms,
223 ));
224
225 sync_service.start().await;
228
229 let status = Status::new();
230 let room_list =
231 RoomList::new(client.clone(), rooms, room_infos, sync_service.clone(), status.handle());
232
233 let room_view = RoomView::new(client.clone(), timelines.clone(), status.handle());
234
235 let (indexing_sender, indexing_receiver) = channel::<(bool, IndexingMessage)>(1024);
236 let indexing_task =
237 spawn(App::indexing_task(client.clone(), indexing_sender, sync_service.clone()));
238
239 let indexing_view = IndexingView::new();
240
241 Ok(Self {
242 sync_service,
243 timelines,
244 room_list,
245 room_view,
246 client,
247 listen_task,
248 indexing_task,
249 indexing_receiver,
250 status,
251 state: AppState {
252 global_mode: GlobalMode::Indexing { view: indexing_view },
253 ..Default::default()
254 },
255 last_tick: Instant::now(),
256 })
257 }
258
259 async fn listen_task(
260 rooms: Rooms,
261 room_infos: RoomInfos,
262 timelines: Timelines,
263 all_rooms: room_list_service::RoomList,
264 ) {
265 let (stream, entries_controller) = all_rooms.entries_with_dynamic_adapters(50_000);
266 entries_controller.set_filter(Box::new(new_filter_non_left()));
267
268 pin_mut!(stream);
269
270 let mut previous_rooms = HashSet::new();
271
272 while let Some(diffs) = stream.next().await {
273 let all_rooms = {
274 let mut rooms = rooms.lock();
276
277 for diff in diffs {
278 diff.apply(&mut rooms);
279 }
280
281 (*rooms).clone()
283 };
284
285 let mut new_rooms = HashMap::new();
286 let mut new_timelines = Vec::new();
287
288 for room in all_rooms.iter() {
290 let raw_name = room.name();
291 let display_name =
292 room.cached_display_name().map(|display_name| display_name.to_string());
293 let is_dm = room
294 .is_direct()
295 .await
296 .map_err(|err| {
297 warn!("couldn't figure whether a room is a DM or not: {err}");
298 })
299 .ok();
300 room_infos.lock().insert(
301 room.room_id().to_owned(),
302 ExtraRoomInfo { raw_name, display_name, is_dm },
303 );
304 }
305
306 for room in
308 all_rooms.into_iter().filter(|room| !previous_rooms.contains(room.room_id()))
309 {
310 let Ok(timeline) = room
312 .timeline_builder()
313 .with_focus(TimelineFocus::Live { hide_threaded_events: true })
314 .build()
315 .await
316 else {
317 error!("error when creating default timeline");
318 continue;
319 };
320
321 let (items, stream) = timeline.subscribe().await;
323 let items = Arc::new(Mutex::new(items));
324
325 let i = items.clone();
327 let timeline_task = spawn(async move {
328 pin_mut!(stream);
329 let items = i;
330 while let Some(diffs) = stream.next().await {
331 let mut items = items.lock();
332
333 for diff in diffs {
334 diff.apply(&mut items);
335 }
336 }
337 });
338
339 new_timelines.push((
340 room.room_id().to_owned(),
341 Timeline { timeline: Arc::new(timeline), items, task: timeline_task },
342 ));
343
344 new_rooms.insert(room.room_id().to_owned(), room);
346 }
347
348 previous_rooms.extend(new_rooms.into_keys());
349
350 timelines.lock().extend(new_timelines);
351 }
352 }
353
354 async fn wait_for_room_sync(
355 update_sender: &Sender<(bool, IndexingMessage)>,
356 sync_service: Arc<SyncService>,
357 ) {
358 let mut sync_subscriber = sync_service.room_list_service().state();
359
360 while let Some(state) = sync_subscriber.next().await {
362 match state {
363 State::Running => return,
364 State::Terminated { from: _prev } => {
365 while let Err(e) =
366 update_sender.send((true, IndexingMessage::Progress(0))).await
367 {
368 debug!("Failed to send final message, trying again: {e:?}");
369 }
370 return;
371 }
372 _ => {
373 debug!("Sync service not running. Waiting to start indexing. {state:?}");
374 }
375 }
376 }
377 }
378
379 async fn index_event_cache(
380 client: &Client,
381 update_sender: &Sender<(bool, IndexingMessage)>,
382 store: &EventCacheStoreLockGuard,
383 search_index_guard: &mut SearchIndexGuard<'_>,
384 mut count: usize,
385 ) -> Result<usize, ()> {
386 for room in client.rooms_filtered(RoomStateFilter::JOINED.union(RoomStateFilter::LEFT)) {
387 let room_id = room.room_id();
388
389 let maybe_room_cache = room.event_cache().await;
390 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
391 warn!("Failed to get RoomEventCache: {maybe_room_cache:?}");
392 continue;
393 };
394
395 let redaction_rules = room.clone_info().room_version_rules_or_default().redaction;
396
397 let maybe_timeline_events = store.get_room_events(room_id, None, None).await;
398 let Ok(timeline_events) = maybe_timeline_events else {
399 warn!("Failed to get room's events: {maybe_timeline_events:?}");
400 continue;
401 };
402
403 let no_of_events = timeline_events.len();
404
405 if let Err(err) = search_index_guard
406 .bulk_handle_timeline_event(
407 timeline_events.clone().into_iter(),
408 &room_cache,
409 room_id,
410 &redaction_rules,
411 )
412 .await
413 {
414 error!("Failed to handle event for indexing: {err}");
415 let mut error = Some(err);
416 while let Some(err) = error.take() {
417 if let Err(e) =
418 update_sender.send((true, IndexingMessage::Error(err.to_string()))).await
419 {
420 debug!("Failed to send final error message, trying again: {e:?}");
421 }
422 }
423 return Err(());
424 }
425
426 count += no_of_events;
427 let _ = update_sender.send((false, IndexingMessage::Progress(count))).await;
428 }
429 Ok(count)
430 }
431
432 async fn index_from_server(
433 client: &Client,
434 update_sender: &Sender<(bool, IndexingMessage)>,
435 search_index_guard: &mut SearchIndexGuard<'_>,
436 mut count: usize,
437 ) -> Result<usize, ()> {
438 let batch_size = 25;
439
440 let mut rooms = client.rooms_filtered(RoomStateFilter::JOINED);
441 let mut idx = 0;
442
443 while !rooms.is_empty() {
444 let room = &rooms[idx];
445
446 let room_id = room.room_id();
447
448 let maybe_room_cache = room.event_cache().await;
449 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
450 warn!("Failed to get RoomEventCache: {maybe_room_cache:?}");
451 idx = (idx + 1) % rooms.len();
452 continue;
453 };
454
455 let redaction_rules = room.clone_info().room_version_rules_or_default().redaction;
456
457 let Ok(pagination) = room_cache.pagination().run_backwards_until(batch_size).await
458 else {
459 error!("Failed to backpaginate {room_id}");
460 idx = (idx + 1) % rooms.len();
461 continue;
462 };
463
464 let no_of_events = pagination.events.len();
465
466 if let Err(err) = search_index_guard
467 .bulk_handle_timeline_event(
468 pagination.events.clone().into_iter(),
469 &room_cache,
470 room_id,
471 &redaction_rules,
472 )
473 .await
474 {
475 warn!("Failed to handle event for indexing: {err}");
476 let mut error = Some(err);
477 while let Some(err) = error.take() {
478 if let Err(e) =
479 update_sender.send((true, IndexingMessage::Error(err.to_string()))).await
480 {
481 debug!("Failed to send final error message, trying again: {e:?}");
482 }
483 }
484 return Err(());
485 }
486
487 count += no_of_events;
488 let _ = update_sender.send((false, IndexingMessage::Progress(count))).await;
489
490 if pagination.reached_start {
491 rooms.remove(idx);
492 let len = rooms.len();
493 if len > 0 {
494 idx %= len;
495 }
496 } else {
497 idx = (idx + 1) % rooms.len();
498 }
499 }
500 Ok(count)
501 }
502
503 async fn indexing_task(
505 client: Client,
506 update_sender: Sender<(bool, IndexingMessage)>,
507 sync_service: Arc<SyncService>,
508 ) {
509 if timeout(Duration::from_secs(30), App::wait_for_room_sync(&update_sender, sync_service))
510 .await
511 .is_err()
512 {
513 debug!("Waiting for sync to run timed out. Quitting indexing task.");
514 return;
515 }
516
517 let Ok(store) = client.event_cache_store().lock().await else {
518 error!("Failed to get EventCacheStore");
519 return;
520 };
521
522 let mut search_index_guard = client.search_index().lock().await;
523 let count = 0;
524
525 debug!("Start indexing from the event cache.");
526
527 let Ok(count) = App::index_event_cache(
529 &client,
530 &update_sender,
531 store.as_clean().expect("Only one process should access the event cache store"),
532 &mut search_index_guard,
533 count,
534 )
535 .await
536 else {
537 debug!("Quitting index task.");
538 return;
539 };
540
541 debug!("Start indexing from the server.");
543
544 let Ok(count) =
545 App::index_from_server(&client, &update_sender, &mut search_index_guard, count).await
546 else {
547 debug!("Quitting index task.");
548 return;
549 };
550
551 while let Err(err) = update_sender.send((true, IndexingMessage::Progress(count))).await {
552 debug!("couldn't send final update {err}, trying again.");
553 }
554 }
555
556 fn set_global_mode(&mut self, mode: GlobalMode) {
557 self.state.global_mode = mode;
558 }
559
560 async fn handle_global_event(&mut self, event: Event) -> Result<bool> {
561 use KeyCode::*;
562
563 match event {
564 Event::Key(KeyEvent { code: F(1), modifiers: KeyModifiers::NONE, .. }) => {
565 self.set_global_mode(GlobalMode::Help)
566 }
567
568 Event::Key(KeyEvent { code: F(10), modifiers: KeyModifiers::NONE, .. }) => self
569 .set_global_mode(GlobalMode::Settings {
570 view: SettingsView::new(self.client.clone(), self.sync_service.clone()),
571 }),
572
573 Event::Key(KeyEvent {
574 code: Char('j') | Down,
575 modifiers: KeyModifiers::CONTROL,
576 ..
577 }) => {
578 self.room_list.next_room().await;
579 let room_id = self.room_list.get_selected_room_id();
580 self.room_view.set_selected_room(room_id);
581 }
582
583 Event::Key(KeyEvent {
584 code: Char('k') | Up, modifiers: KeyModifiers::CONTROL, ..
585 }) => {
586 self.room_list.previous_room().await;
587 let room_id = self.room_list.get_selected_room_id();
588 self.room_view.set_selected_room(room_id);
589 }
590
591 Event::Key(KeyEvent { code: Char('m'), modifiers: KeyModifiers::ALT, .. }) => {
592 self.room_view.mark_as_read().await
593 }
594
595 Event::Key(KeyEvent { code: Char('q'), modifiers: KeyModifiers::CONTROL, .. }) => {
596 if !matches!(self.state.global_mode, GlobalMode::Default) {
597 self.set_global_mode(GlobalMode::Default);
598 } else {
599 return Ok(true);
600 }
601 }
602
603 Event::Key(KeyEvent { modifiers: KeyModifiers::CONTROL, code: Char('r'), .. }) => {
604 self.set_global_mode(GlobalMode::CreateRoom { view: CreateRoomView::new() })
605 }
606
607 Event::Key(KeyEvent { modifiers: KeyModifiers::CONTROL, code: Char('s'), .. }) => {
608 self.set_global_mode(GlobalMode::Searching { view: SearchingView::new() })
609 }
610
611 _ => self.room_view.handle_event(event).await,
612 }
613
614 Ok(false)
615 }
616
617 fn on_tick(&mut self) {
618 self.state.throbber_state.calc_next();
619
620 match &mut self.state.global_mode {
621 GlobalMode::Help
622 | GlobalMode::Default
623 | GlobalMode::CreateRoom { .. }
624 | GlobalMode::Searching { .. }
625 | GlobalMode::Exiting { .. } => {}
626 GlobalMode::Settings { view } => {
627 view.on_tick();
628 }
629 GlobalMode::Indexing { view } => {
630 view.on_tick();
631 }
632 }
633 }
634
635 async fn render_loop(&mut self, mut terminal: Terminal<impl Backend>) -> Result<()> {
636 use KeyCode::*;
637
638 let mut check_channel = true;
639
640 loop {
641 if check_channel {
642 match self.indexing_receiver.try_recv() {
643 Ok((done, message)) => {
644 if !matches!(message, IndexingMessage::Error(_)) && done {
645 self.set_global_mode(GlobalMode::Default);
646 } else if let GlobalMode::Indexing { view } = &mut self.state.global_mode {
647 view.set_message(message);
648 }
649 }
650 Err(TryRecvError::Disconnected) => check_channel = false,
651 Err(TryRecvError::Empty) => {}
652 }
653 }
654
655 terminal.draw(|f| f.render_widget(&mut *self, f.area()))?;
656
657 if event::poll(Duration::from_millis(100))? {
658 let event = event::read()?;
659
660 match &mut self.state.global_mode {
661 GlobalMode::Default => {
662 if self.handle_global_event(event).await? {
663 let sync_service = self.sync_service.clone();
664 let timelines = self.timelines.clone();
665 let listen_task = self.listen_task.abort_handle();
666 let indexing_task = self.indexing_task.abort_handle();
667
668 let shutdown_task = spawn(async move {
669 sync_service.stop().await;
670
671 listen_task.abort();
672 indexing_task.abort();
673
674 for timeline in timelines.lock().values() {
675 timeline.task.abort();
676 }
677 });
678
679 self.set_global_mode(GlobalMode::Exiting { shutdown_task });
680 }
681 }
682 GlobalMode::Help => {
683 if let Event::Key(key) = event
684 && let KeyModifiers::NONE = key.modifiers
685 && let Char('q') | Esc = key.code
686 {
687 self.set_global_mode(GlobalMode::Default)
688 }
689 }
690 GlobalMode::Settings { view } => {
691 if let Event::Key(key) = event
692 && view.handle_key_press(key).await
693 {
694 self.set_global_mode(GlobalMode::Default);
695 }
696 }
697 GlobalMode::CreateRoom { view } => {
698 if let Event::Key(key) = event
699 && let KeyModifiers::NONE = key.modifiers
700 {
701 match key.code {
702 Enter => {
703 if let Some(room_name) = view.get_text() {
704 let mut request = CreateRoomRequest::new();
705 request.name = Some(room_name);
706 if let Err(err) = self
707 .sync_service
708 .room_list_service()
709 .client()
710 .create_room(request)
711 .await
712 {
713 error!("error while creating room: {err:?}");
714 }
715 }
716 self.set_global_mode(GlobalMode::Default);
717 }
718 Esc => self.set_global_mode(GlobalMode::Default),
719 _ => view.handle_key_press(key),
720 }
721 }
722 }
723 GlobalMode::Searching { view } => {
724 if let Event::Key(key) = event {
725 match key.code {
726 Enter => {
727 if let Some(query) = view.get_text() {
728 if let Some(room) = self.room_view.room() {
729 if let Ok(results) =
730 room.search(&query, 100, None).await.inspect_err(|err| {
731 error!("error occurred while searching index: {err:?}");
732 })
733 {
734 let results = get_events_from_event_ids(
735 &self.client,
736 &room,
737 results,
738 )
739 .await;
740
741 view.results(results);
742 }
743 } else {
744 warn!("No room in view.")
745 }
746 }
747 }
748 Esc => self.set_global_mode(GlobalMode::Default),
749 Up => view.list_state.previous(),
750 Down => view.list_state.next(),
751 _ => view.handle_key_press(key),
752 }
753 }
754 }
755 GlobalMode::Indexing { .. } => {
756 if let Event::Key(key) = event
757 && let KeyModifiers::NONE = key.modifiers
758 && let Esc = key.code
759 {
760 self.indexing_task.abort();
761 self.set_global_mode(GlobalMode::Default);
762 }
763 }
764 GlobalMode::Exiting { .. } => {}
765 }
766 }
767
768 match &self.state.global_mode {
769 GlobalMode::Default
770 | GlobalMode::Help
771 | GlobalMode::CreateRoom { .. }
772 | GlobalMode::Searching { .. }
773 | GlobalMode::Indexing { .. }
774 | GlobalMode::Settings { .. } => {}
775 GlobalMode::Exiting { shutdown_task } => {
776 if shutdown_task.is_finished() {
777 break;
778 }
779 }
780 }
781
782 if self.last_tick.elapsed() >= Self::TICK_RATE {
783 self.on_tick();
784 self.last_tick = Instant::now();
785 }
786 }
787
788 Ok(())
789 }
790
791 async fn run(&mut self, terminal: Terminal<impl Backend>) -> Result<()> {
792 self.render_loop(terminal).await?;
793
794 ratatui::restore();
796 execute!(stdout(), DisableMouseCapture)?;
797
798 Ok(())
799 }
800}
801
802impl Widget for &mut App {
803 fn render(self, area: Rect, buf: &mut Buffer) {
805 let vertical =
807 Layout::vertical([Constraint::Length(2), Constraint::Min(0), Constraint::Length(1)]);
808 let [header_area, rest_area, status_area] = vertical.areas(area);
809
810 let horizontal =
813 Layout::horizontal([Constraint::Percentage(25), Constraint::Percentage(75)]);
814 let [room_list_area, room_view_area] = horizontal.areas(rest_area);
815
816 self.render_title(header_area, buf);
817 self.room_list.render(room_list_area, buf);
818 self.room_view.render(room_view_area, buf);
819 self.status.render(status_area, buf, &mut self.state);
820
821 match &mut self.state.global_mode {
822 GlobalMode::Default => {}
823 GlobalMode::Exiting { .. } => {
824 Clear.render(rest_area, buf);
825 let centered = create_centered_throbber_area(area);
826 let throbber = Throbber::default()
827 .label("Exiting")
828 .throbber_set(throbber_widgets_tui::BRAILLE_EIGHT_DOUBLE);
829 StatefulWidget::render(throbber, centered, buf, &mut self.state.throbber_state);
830 }
831 GlobalMode::Settings { view } => {
832 view.render(area, buf);
833 }
834 GlobalMode::Help => {
835 let mut help_view = HelpView::new();
836 help_view.render(area, buf);
837 }
838 GlobalMode::CreateRoom { view } => {
839 view.render(area, buf);
840 }
841 GlobalMode::Searching { view } => {
842 view.render(room_view_area, buf);
843 }
844 GlobalMode::Indexing { view } => {
845 view.render(area, buf);
846 }
847 }
848 }
849}
850
851impl App {
852 fn render_title(&self, area: Rect, buf: &mut Buffer) {
854 Paragraph::new("Multiverse").bold().centered().render(area, buf);
855 }
856}
857
858async fn configure_client(cli: Cli) -> Result<Client> {
862 let Cli { server_name, session_path, proxy, dont_share_pos: _ } = cli;
863
864 let mut client_builder = Client::builder()
865 .store_config(
866 StoreConfig::new("multiverse".to_owned())
867 .crypto_store(SqliteCryptoStore::open(session_path.join("crypto"), None).await?)
868 .state_store(SqliteStateStore::open(session_path.join("state"), None).await?)
869 .event_cache_store(
870 SqliteEventCacheStore::open(session_path.join("cache"), None).await?,
871 ),
872 )
873 .server_name_or_homeserver_url(&server_name)
874 .with_encryption_settings(EncryptionSettings {
875 auto_enable_cross_signing: true,
876 backup_download_strategy: BackupDownloadStrategy::AfterDecryptionFailure,
877 auto_enable_backups: true,
878 })
879 .with_enable_share_history_on_invite(true)
880 .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true })
881 .search_index_store(SearchIndexStoreKind::UnencryptedDirectory(
882 session_path.join("indexData"),
883 ));
884
885 if let Some(proxy_url) = proxy {
886 client_builder = client_builder.proxy(proxy_url).disable_ssl_verification();
887 }
888
889 let client = client_builder.build().await?;
890
891 log_in_or_restore_session(&client, &session_path).await?;
893
894 Ok(client)
895}
896
897async fn log_in_or_restore_session(client: &Client, session_path: &Path) -> Result<()> {
898 let session_path = session_path.join("session.json");
899
900 if let Ok(serialized) = std::fs::read_to_string(&session_path) {
901 let session: MatrixSession = serde_json::from_str(&serialized)?;
902 client.restore_session(session).await?;
903 } else {
904 login_with_password(client).await?;
905
906 if let Some(session) = client.session() {
908 let AuthSession::Matrix(session) = session else {
909 panic!("unexpected OAuth 2.0 session")
910 };
911 let serialized = serde_json::to_string(&session)?;
912 std::fs::write(session_path, serialized)?;
913
914 println!("saved session");
915 }
916 }
917
918 Ok(())
919}
920
921async fn login_with_password(client: &Client) -> Result<()> {
924 println!("Logging in with username and password…");
925
926 loop {
927 print!("\nUsername: ");
928 stdout().flush().expect("Unable to write to stdout");
929 let mut username = String::new();
930 io::stdin().read_line(&mut username).expect("Unable to read user input");
931 username = username.trim().to_owned();
932
933 let password = rpassword::prompt_password("Password.")?;
934
935 match client.matrix_auth().login_username(&username, password.trim()).await {
936 Ok(_) => {
937 println!("Logged in as {username}");
938 break;
939 }
940 Err(error) => {
941 println!("Error logging in: {error}");
942 println!("Please try again\n");
943 }
944 }
945 }
946
947 Ok(())
948}
949
950async fn get_events_from_event_ids(
951 client: &Client,
952 room: &Room,
953 event_ids: Vec<OwnedEventId>,
954) -> Vec<TimelineEvent> {
955 if let Ok(cache_lock) = client.event_cache_store().lock().await {
956 let cache_lock =
957 cache_lock.as_clean().expect("Only one process must access the event cache store");
958
959 futures_util::future::join_all(event_ids.iter().map(|event_id| async {
960 let event_id = event_id.clone();
961 match cache_lock.find_event(room.room_id(), &event_id).await {
962 Ok(ev) => ev,
963 Err(_) => room
964 .event(&event_id, None)
965 .await
966 .inspect_err(|err| {
967 debug!("Failed to find event {event_id} in event cache and server: {err}");
968 })
969 .ok(),
970 }
971 }))
972 .await
973 .into_iter()
974 .flatten()
975 .collect::<Vec<TimelineEvent>>()
976 } else {
977 debug!("Couldnt get event cache store lock.");
978 Vec::new()
979 }
980}