Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/api/api_utils/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,7 @@ mod tests {
report_count: 0,
unresolved_report_count: 0,
federation_pending: false,
is_counted: todo!(),
};
assert!(check_comment_depth(&comment).is_ok());
comment.path = Ltree("0.123.456".to_string());
Expand Down
1 change: 1 addition & 0 deletions crates/db_schema/src/impls/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ mod tests {
report_count: 0,
unresolved_report_count: 0,
federation_pending: false,
is_counted: todo!(),
};

let child_comment_form = CommentInsertForm::new(
Expand Down
1 change: 1 addition & 0 deletions crates/db_schema/src/impls/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ mod tests {
scaled_rank: RANK_DEFAULT,
unresolved_report_count: 0,
federation_pending: false,
is_counted: todo!(),
};

// Post Like
Expand Down
2 changes: 2 additions & 0 deletions crates/db_schema/src/source/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct Comment {
/// If a local user comments in a remote community, the comment is hidden until it is confirmed
/// accepted by the community (by receiving it back via federation).
pub federation_pending: bool,
#[serde(skip)]
pub is_counted: bool,
}

#[derive(Debug, Clone, derive_new::new)]
Expand Down
2 changes: 2 additions & 0 deletions crates/db_schema/src/source/comment_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct CommentReport {
pub published_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
pub violates_instance_rules: bool,
#[serde(skip)]
pub is_counted: bool,
}

#[derive(Clone)]
Expand Down
2 changes: 2 additions & 0 deletions crates/db_schema/src/source/community_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct CommunityReport {
pub resolver_id: Option<PersonId>,
pub published_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
#[serde(skip)]
pub is_counted: bool,
}

#[derive(Clone)]
Expand Down
4 changes: 4 additions & 0 deletions crates/db_schema/src/source/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ pub struct Post {
/// If a local user posts in a remote community, the comment is hidden until it is confirmed
/// accepted by the community (by receiving it back via federation).
pub federation_pending: bool,
#[serde(skip)]
pub is_counted: bool,
}

// TODO: FromBytes, ToBytes are only needed to develop wasm plugin, could be behind feature flag
Expand Down Expand Up @@ -201,6 +203,8 @@ pub struct PostActions {
pub like_score: Option<i16>,
/// When the post was hidden.
pub hidden_at: Option<DateTime<Utc>>,
#[serde(skip)]
pub like_is_counted: Option<bool>,
}

#[derive(Clone, derive_new::new)]
Expand Down
2 changes: 2 additions & 0 deletions crates/db_schema/src/source/post_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct PostReport {
pub published_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
pub violates_instance_rules: bool,
#[serde(skip)]
pub is_counted: bool,
}

#[derive(Clone, Default)]
Expand Down
1 change: 1 addition & 0 deletions crates/db_schema/src/utils/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub fn comment_select_remove_deletes() -> _ {
comment::report_count,
comment::unresolved_report_count,
comment::federation_pending,
comment::is_counted,
)
}

Expand Down
8 changes: 8 additions & 0 deletions crates/db_schema_file/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ diesel::table! {
report_count -> Int2,
unresolved_report_count -> Int2,
federation_pending -> Bool,
is_counted -> Bool,
}
}

Expand All @@ -153,6 +154,7 @@ diesel::table! {
like_score -> Nullable<Int2>,
liked_at -> Nullable<Timestamptz>,
saved_at -> Nullable<Timestamptz>,
like_is_counted -> Nullable<Bool>,
}
}

Expand All @@ -178,6 +180,7 @@ diesel::table! {
published_at -> Timestamptz,
updated_at -> Nullable<Timestamptz>,
violates_instance_rules -> Bool,
is_counted -> Bool,
}
}

Expand Down Expand Up @@ -249,6 +252,7 @@ diesel::table! {
became_moderator_at -> Nullable<Timestamptz>,
received_ban_at -> Nullable<Timestamptz>,
ban_expires_at -> Nullable<Timestamptz>,
follow_is_counted -> Nullable<Bool>,
}
}

Expand All @@ -275,6 +279,7 @@ diesel::table! {
resolver_id -> Nullable<Int4>,
published_at -> Timestamptz,
updated_at -> Nullable<Timestamptz>,
is_counted -> Bool,
}
}

Expand Down Expand Up @@ -930,6 +935,7 @@ diesel::table! {
report_count -> Int2,
unresolved_report_count -> Int2,
federation_pending -> Bool,
is_counted -> Bool,
}
}

Expand All @@ -944,6 +950,7 @@ diesel::table! {
liked_at -> Nullable<Timestamptz>,
like_score -> Nullable<Int2>,
hidden_at -> Nullable<Timestamptz>,
like_is_counted -> Nullable<Bool>,
}
}

Expand All @@ -962,6 +969,7 @@ diesel::table! {
published_at -> Timestamptz,
updated_at -> Nullable<Timestamptz>,
violates_instance_rules -> Bool,
is_counted -> Bool,
}
}

Expand Down
59 changes: 57 additions & 2 deletions crates/routes/src/utils/scheduled_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diesel::{
QueryDsl,
QueryableByName,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_async::{AsyncPgConnection, RunQueryDsl, SimpleAsyncConnection};
use diesel_uplete::uplete;
use lemmy_api_utils::{
context::LemmyContext,
Expand Down Expand Up @@ -49,7 +49,8 @@ use lemmy_db_schema_file::schema::{
use lemmy_db_views_site::SiteView;
use lemmy_utils::error::{LemmyErrorType, LemmyResult};
use reqwest_middleware::ClientWithMiddleware;
use std::time::Duration;
use std::{convert::Infallible, time::Duration};
use tokio::sync::Notify;
use tracing::{info, warn};

/// Schedules various cleanup tasks for lemmy in a background thread
Expand Down Expand Up @@ -128,6 +129,29 @@ pub async fn setup(context: Data<LemmyContext>) -> LemmyResult<()> {
}
});

let context_1 = context.clone();
// Update counters.
//
// This handles high-frequency changes more efficiently than triggers would. For example, if a
// counter update begins, and two posts are created during that update, then the next counter
// update will increment counters by 2. With triggers, the first post's insert transaction would
// include incrementing the counter by 1, and the second post's insert transaction would include
// waiting for the first post's insert transaction to finish before incrementing the counter by 1.
//
// To prevent users from sensing imperfectness or being misled because of delayed counter updates,
// queries use the `count` aggregate function to count items that have not yet been handled by a
// counter update. To minimize the chance that this involves scanning anything other than an empty
// index, the delay of a counter update is minimized by responding to notifications (sent by
// triggers) instead of schedule intervals.
tokio::spawn(async move {
loop {
match counter_update_loop(&mut context_1.pool()).await {
Ok(never) => match never {},
Err(e) => warn!("Counter update loop unexpectedly stopped and will restart: {e}"),
}
}
});

// Manually run the scheduler in an event loop
loop {
scheduler.run_pending().await;
Expand Down Expand Up @@ -561,6 +585,37 @@ async fn build_update_instance_form(
Some(instance_form)
}

async fn counter_update_loop(pool: &mut DbPool<'_>) -> LemmyResult<Infallible> {
let mut conn = get_conn(pool).await?;
conn.batch_execute("LISTEN need_counter_update").await?;
let mut stream = conn.notification_stream();

let wakeup = Notify::new();

// To handle crashes and prevent race conditions, unconditionally update counters after the
// `LISTEN` statement.
wakeup.notify_one();

let listen_to_db = async {
loop {
let _: PgNotification = stream
.next
.await
.context("unexpected end of notification stream")??;
wakeup.notify_one();
}
};

let update_when_woke = async {
loop {
wakeup.notified().await;
// TODO: update counters here (there must be an additional filter to prevent integer overflow errors from being thrown)
}
};

tokio::select! {result = listen_to_db => result, result = update_when_woke => result}
}

#[cfg(test)]
mod tests {

Expand Down
1 change: 1 addition & 0 deletions migrations/2025-07-14-215605_scalable_count/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
27 changes: 27 additions & 0 deletions migrations/2025-07-14-215605_scalable_count/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
ALTER TABLE comment
ADD COLUMN is_counted boolean NOT NULL DEFAULT FALSE;

ALTER TABLE comment_actions
ADD COLUMN like_is_counted boolean,
ADD CONSTRAINT comment_actions_check_like_is_counted CHECK ((liked_at IS NULL) = (like_is_counted IS NULL));

ALTER TABLE comment_report
ADD COLUMN is_counted boolean NOT NULL DEFAULT FALSE;

ALTER TABLE community_actions
ADD COLUMN follow_is_counted boolean,
ADD CONSTRAINT community_actions_check_follow_is_counted CHECK ((followed_at IS NULL) = (follow_is_counted IS NULL));

ALTER TABLE community_report
ADD COLUMN is_counted boolean NOT NULL DEFAULT FALSE;

ALTER TABLE post
ADD COLUMN is_counted boolean NOT NULL DEFAULT FALSE;

ALTER TABLE post_actions
ADD COLUMN like_is_counted boolean,
ADD CONSTRAINT post_actions_check_like_is_counted CHECK ((liked_at IS NULL) = (like_is_counted IS NULL));

ALTER TABLE post_report
ADD COLUMN is_counted boolean NOT NULL DEFAULT FALSE;