aports/testing/iamb/reduce-cpu-usage.patch
Celeste 39175b1f15 testing/iamb: improve
Backported 2 patches from upstream:
  - example-config.patch fixes syntax error and directory path
  - reduce-cpu-usage.patch fixes high CPU load

Also, remove `cargo update` used to fix open64 from prepare(),
and move the changes it applies into getrandom-0.2.10.patch.
2023-11-03 03:46:39 +00:00

433 lines
16 KiB
Diff

Patch-Source: https://github.com/ulyssa/iamb/commit/b2b47ed7a0a04577a4c93d066ee1341c7aa05df9.patch
--
From b2b47ed7a0a04577a4c93d066ee1341c7aa05df9 Mon Sep 17 00:00:00 2001
From: Benjamin Lee <benjamin@computer.surgery>
Date: Sun, 15 Oct 2023 18:12:39 -0700
Subject: [PATCH] Reduce CPU usage by instead fetching read receipts after
related sync events (#168)
---
src/base.rs | 53 ++++++++++---------
src/message/mod.rs | 22 ++++----
src/tests.rs | 3 +-
src/worker.rs | 128 +++++++++++++++++++++++++++++----------------
4 files changed, 124 insertions(+), 82 deletions(-)
diff --git a/src/base.rs b/src/base.rs
index 7089bb4..07d825a 100644
--- a/src/base.rs
+++ b/src/base.rs
@@ -413,8 +413,6 @@ pub type IambResult<T> = UIResult<T, IambInfo>;
/// it's reacting to.
pub type MessageReactions = HashMap<OwnedEventId, (String, OwnedUserId)>;
-pub type Receipts = HashMap<OwnedEventId, Vec<OwnedUserId>>;
-
#[derive(thiserror::Error, Debug)]
pub enum IambError {
#[error("Invalid user identifier: {0}")]
@@ -549,7 +546,14 @@ pub struct RoomInfo {
pub messages: Messages,
/// A map of read markers to display on different events.
- pub receipts: HashMap<OwnedEventId, Vec<OwnedUserId>>,
+ pub event_receipts: HashMap<OwnedEventId, HashSet<OwnedUserId>>,
+
+ /// A map of the most recent read marker for each user.
+ ///
+ /// Every receipt in this map should also have an entry in [`event_receipts`],
+ /// however not every user has an entry. If a user's most recent receipt is
+ /// older than the oldest loaded event, that user will not be included.
+ pub user_receipts: HashMap<OwnedUserId, OwnedEventId>,
/// An event ID for where we should indicate we've read up to.
pub read_till: Option<OwnedEventId>,
@@ -698,6 +702,27 @@ impl RoomInfo {
self.fetch_last.map_or(false, |i| i.elapsed() < ROOM_FETCH_DEBOUNCE)
}
+ fn clear_receipt(&mut self, user_id: &OwnedUserId) -> Option<()> {
+ let old_event_id = self.user_receipts.get(user_id)?;
+ let old_receipts = self.event_receipts.get_mut(old_event_id)?;
+ old_receipts.remove(user_id);
+
+ if old_receipts.is_empty() {
+ self.event_receipts.remove(old_event_id);
+ }
+
+ None
+ }
+
+ pub fn set_receipt(&mut self, user_id: OwnedUserId, event_id: OwnedEventId) {
+ self.clear_receipt(&user_id);
+ self.event_receipts
+ .entry(event_id.clone())
+ .or_default()
+ .insert(user_id.clone());
+ self.user_receipts.insert(user_id, event_id);
+ }
+
fn get_typers(&self) -> &[OwnedUserId] {
if let Some((t, users)) = &self.users_typing {
if t.elapsed() < Duration::from_secs(4) {
@@ -860,25 +885,6 @@ impl ChatStore {
.unwrap_or_else(|| "Untitled Matrix Room".to_string())
}
- pub async fn set_receipts(
- &mut self,
- receipts: Vec<(OwnedRoomId, Receipts)>,
- ) -> Vec<(OwnedRoomId, OwnedEventId)> {
- let mut updates = vec![];
-
- for (room_id, receipts) in receipts.into_iter() {
- if let Some(info) = self.rooms.get_mut(&room_id) {
- info.receipts = receipts;
-
- if let Some(read_till) = info.read_till.take() {
- updates.push((room_id, read_till));
- }
- }
- }
-
- return updates;
- }
-
pub fn mark_for_load(&mut self, room_id: OwnedRoomId) {
self.need_load.insert(room_id);
}
diff --git a/src/message/mod.rs b/src/message/mod.rs
index de76bfa..118c01e 100644
--- a/src/message/mod.rs
+++ b/src/message/mod.rs
@@ -2,10 +2,10 @@
use std::borrow::Cow;
use std::cmp::{Ord, Ordering, PartialOrd};
use std::collections::hash_map::DefaultHasher;
+use std::collections::hash_set;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::hash::{Hash, Hasher};
-use std::slice::Iter;
use chrono::{DateTime, Local as LocalTz, NaiveDateTime, TimeZone};
use comrak::{markdown_to_html, ComrakOptions};
@@ -501,7 +501,7 @@ struct MessageFormatter<'a> {
date: Option<Span<'a>>,
/// Iterator over the users who have read up to this message.
- read: Iter<'a, OwnedUserId>,
+ read: Option<hash_set::Iter<'a, OwnedUserId>>,
}
impl<'a> MessageFormatter<'a> {
@@ -533,10 +533,11 @@ impl<'a> MessageFormatter<'a> {
// Show read receipts.
let user_char =
|user: &'a OwnedUserId| -> Span<'a> { settings.get_user_char_span(user) };
+ let mut read = self.read.iter_mut().flatten();
- let a = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
- let b = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
- let c = self.read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
+ let a = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
+ let b = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
+ let c = read.next().map(user_char).unwrap_or_else(|| Span::raw(" "));
line.push(Span::raw(" "));
line.push(c);
@@ -650,10 +651,7 @@ impl Message {
let fill = width - USER_GUTTER - TIME_GUTTER - READ_GUTTER;
let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time();
- let read = match info.receipts.get(self.event.event_id()) {
- Some(read) => read.iter(),
- None => [].iter(),
- };
+ let read = info.event_receipts.get(self.event.event_id()).map(|read| read.iter());
MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if USER_GUTTER + TIME_GUTTER + MIN_MSG_LEN <= width {
@@ -661,7 +659,7 @@ impl Message {
let fill = width - USER_GUTTER - TIME_GUTTER;
let user = self.show_sender(prev, true, info, settings);
let time = self.timestamp.show_time();
- let read = [].iter();
+ let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else if USER_GUTTER + MIN_MSG_LEN <= width {
@@ -669,7 +667,7 @@ impl Message {
let fill = width - USER_GUTTER;
let user = self.show_sender(prev, true, info, settings);
let time = None;
- let read = [].iter();
+ let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read }
} else {
@@ -677,7 +675,7 @@ impl Message {
let fill = width.saturating_sub(2);
let user = self.show_sender(prev, false, info, settings);
let time = None;
- let read = [].iter();
+ let read = None;
MessageFormatter { settings, cols, orig, fill, user, date, time, read }
}
diff --git a/src/tests.rs b/src/tests.rs
index 94d120f..aa687e3 100644
--- a/src/tests.rs
+++ b/src/tests.rs
@@ -153,7 +153,8 @@ pub fn mock_room() -> RoomInfo {
keys: mock_keys(),
messages: mock_messages(),
- receipts: HashMap::new(),
+ event_receipts: HashMap::new(),
+ user_receipts: HashMap::new(),
read_till: None,
reactions: HashMap::new(),
diff --git a/src/worker.rs b/src/worker.rs
index 646f010..255d41c 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -41,6 +41,7 @@ use matrix_sdk::{
},
presence::PresenceEvent,
reaction::ReactionEventContent,
+ receipt::{ReceiptEventContent, ReceiptType},
room::{
encryption::RoomEncryptionEventContent,
member::OriginalSyncRoomMemberEvent,
@@ -55,12 +56,14 @@ use matrix_sdk::{
AnyTimelineEvent,
EmptyStateKey,
InitialStateEvent,
+ SyncEphemeralRoomEvent,
SyncMessageLikeEvent,
SyncStateEvent,
},
room::RoomType,
serde::Raw,
EventEncryptionAlgorithm,
+ EventId,
OwnedEventId,
OwnedRoomId,
OwnedRoomOrAliasId,
@@ -84,8 +87,8 @@ use crate::{
EventLocation,
IambError,
IambResult,
- Receipts,
RoomFetchStatus,
+ RoomInfo,
VerifyAction,
},
message::MessageFetchResult,
@@ -171,6 +174,19 @@ pub async fn create_room(
return Ok(resp.room_id);
}
+async fn update_event_receipts(info: &mut RoomInfo, room: &MatrixRoom, event_id: &EventId) {
+ let receipts = match room.event_read_receipts(event_id).await {
+ Ok(receipts) => receipts,
+ Err(e) => {
+ tracing::warn!(?event_id, "failed to get event receipts: {e}");
+ return;
+ },
+ };
+ for (user_id, _) in receipts {
+ info.set_receipt(user_id, event_id.to_owned());
+ }
+}
+
async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<String>> {
let mut locked = store.lock().await;
let ChatStore { need_load, rooms, .. } = &mut locked.application;
@@ -200,7 +216,7 @@ async fn load_plan(store: &AsyncProgramStore) -> HashMap<OwnedRoomId, Option<Str
}
async fn load_older_one(
- client: Client,
+ client: &Client,
room_id: &RoomId,
fetch_id: Option<String>,
limit: u32,
@@ -228,7 +244,12 @@ async fn load_older_one(
}
}
-async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) {
+async fn load_insert(
+ client: &Client,
+ room_id: OwnedRoomId,
+ res: MessageFetchResult,
+ store: AsyncProgramStore,
+) {
let mut locked = store.lock().await;
let ChatStore { need_load, presences, rooms, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone());
@@ -240,6 +261,10 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
let sender = msg.sender().to_owned();
let _ = presences.get_or_default(sender);
+ if let Some(room) = client.get_room(&room_id) {
+ update_event_receipts(info, &room, msg.event_id()).await;
+ }
+
match msg {
AnyMessageLikeEvent::RoomEncrypted(msg) => {
info.insert_encrypted(msg);
@@ -277,8 +302,8 @@ async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
let store = store.clone();
async move {
- let res = load_older_one(client, room_id.as_ref(), fetch_id, limit).await;
- load_insert(room_id, res, store).await;
+ let res = load_older_one(&client, room_id.as_ref(), fetch_id, limit).await;
+ load_insert(&client, room_id, res, store).await;
}
})
.collect::<FuturesUnordered<_>>()
@@ -357,28 +382,40 @@ async fn refresh_rooms_forever(client: &Client, store: &AsyncProgramStore) {
}
}
-async fn refresh_receipts_forever(client: &Client, store: &AsyncProgramStore) {
- // Update the displayed read receipts every 5 seconds.
- let mut interval = tokio::time::interval(Duration::from_secs(5));
+async fn send_receipts_forever(client: &Client, store: &AsyncProgramStore) {
+ let mut interval = tokio::time::interval(Duration::from_secs(2));
let mut sent = HashMap::<OwnedRoomId, OwnedEventId>::default();
loop {
interval.tick().await;
- let receipts = update_receipts(client).await;
- let read = store.lock().await.application.set_receipts(receipts).await;
-
- for (room_id, read_till) in read.into_iter() {
- if let Some(read_sent) = sent.get(&room_id) {
- if read_sent == &read_till {
- // Skip unchanged receipts.
- continue;
- }
- }
- if let Some(room) = client.get_joined_room(&room_id) {
- if room.read_receipt(&read_till).await.is_ok() {
- sent.insert(room_id, read_till);
+ let locked = store.lock().await;
+ let updates = client
+ .joined_rooms()
+ .into_iter()
+ .filter_map(|room| {
+ let room_id = room.room_id().to_owned();
+ let info = locked.application.rooms.get(&room_id)?;
+ let new_receipt = info.read_till.as_ref()?;
+ let old_receipt = sent.get(&room_id);
+ if Some(new_receipt) != old_receipt {
+ Some((room_id, new_receipt.clone()))
+ } else {
+ None
}
+ })
+ .collect::<Vec<_>>();
+ drop(locked);
+
+ for (room_id, new_receipt) in updates {
+ let Some(room) = client.get_joined_room(&room_id) else {
+ continue;
+ };
+ match room.read_receipt(&new_receipt).await {
+ Ok(()) => {
+ sent.insert(room_id, new_receipt);
+ },
+ Err(e) => tracing::warn!(?room_id, "Failed to set read receipt: {e}"),
}
}
}
@@ -413,29 +450,6 @@ fn oneshot<T>() -> (ClientReply<T>, ClientResponse<T>) {
return (reply, response);
}
-async fn update_receipts(client: &Client) -> Vec<(OwnedRoomId, Receipts)> {
- let mut rooms = vec![];
-
- for room in client.joined_rooms() {
- if let Ok(users) = room.active_members_no_sync().await {
- let mut receipts = Receipts::new();
-
- for member in users {
- let res = room.user_read_receipt(member.user_id()).await;
-
- if let Ok(Some((event_id, _))) = res {
- let user_id = member.user_id().to_owned();
- receipts.entry(event_id).or_default().push(user_id);
- }
- }
-
- rooms.push((room.room_id().to_owned(), receipts));
- }
- }
-
- return rooms;
-}
-
pub type FetchedRoom = (MatrixRoom, DisplayName, Option<Tags>);
pub enum WorkerTask {
@@ -787,6 +801,7 @@ impl ClientWorker {
let _ = locked.application.presences.get_or_default(sender);
let info = locked.application.get_room_info(room_id.to_owned());
+ update_event_receipts(info, &room, ev.event_id()).await;
info.insert(ev.into_full_event(room_id.to_owned()));
}
},
@@ -805,11 +820,34 @@ impl ClientWorker {
let _ = locked.application.presences.get_or_default(sender);
let info = locked.application.get_room_info(room_id.to_owned());
+ update_event_receipts(info, &room, ev.event_id()).await;
info.insert_reaction(ev.into_full_event(room_id.to_owned()));
}
},
);
+ let _ = self.client.add_event_handler(
+ |ev: SyncEphemeralRoomEvent<ReceiptEventContent>,
+ room: MatrixRoom,
+ store: Ctx<AsyncProgramStore>| {
+ async move {
+ let room_id = room.room_id();
+
+ let mut locked = store.lock().await;
+
+ let info = locked.application.get_room_info(room_id.to_owned());
+ for (event_id, receipts) in ev.content.0.into_iter() {
+ let Some(receipts) = receipts.get(&ReceiptType::Read) else {
+ continue;
+ };
+ for user_id in receipts.keys() {
+ info.set_receipt(user_id.to_owned(), event_id.clone());
+ }
+ }
+ }
+ },
+ );
+
let _ = self.client.add_event_handler(
|ev: OriginalSyncRoomRedactionEvent,
room: MatrixRoom,
@@ -992,7 +1030,7 @@ impl ClientWorker {
async move {
let load = load_older_forever(&client, &store);
- let rcpt = refresh_receipts_forever(&client, &store);
+ let rcpt = send_receipts_forever(&client, &store);
let room = refresh_rooms_forever(&client, &store);
let ((), (), ()) = tokio::join!(load, rcpt, room);
}