diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index e6e20b698..aa4597fec 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -181,6 +181,104 @@ mod tests { assert!(session_lookup.is_revoked()); } + #[sqlx::test(migrator = "crate::MIGRATOR")] + async fn test_session_revoke_bulk(pool: PgPool) { + let mut rng = ChaChaRng::seed_from_u64(42); + let clock = MockClock::default(); + let mut repo = PgRepository::from_pool(&pool).await.unwrap(); + + let alice_user = repo + .user() + .add(&mut rng, &clock, "alice".to_owned()) + .await + .unwrap(); + let bob_user = repo + .user() + .add(&mut rng, &clock, "bob".to_owned()) + .await + .unwrap(); + + let session1 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&alice_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, + &clock, + &session1, + "mpt_hiss", + Some(Duration::days(42)), + ) + .await + .unwrap(); + + let session2 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&bob_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, &clock, &session2, "mpt_meow", // No expiry + None, + ) + .await + .unwrap(); + + // Just one session without a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(false) + ) + .await + .unwrap(), + 1 + ); + + // Just one session with a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(true) + ) + .await + .unwrap(), + 1 + ); + + // No active sessions left + assert_eq!( + repo.personal_session() + .revoke_bulk(&clock, PersonalSessionFilter::new().active_only()) + .await + .unwrap(), + 0 + ); + } + #[sqlx::test(migrator = "crate::MIGRATOR")] async fn test_access_token_repository(pool: PgPool) { const FIRST_TOKEN: &str = "first_access_token"; diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 8b1723767..db8e46ff3 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -357,6 +357,68 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .map_err(DatabaseError::to_invalid_operation) } + #[tracing::instrument( + name = "db.personal_session.revoke_bulk", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result { + let revoked_at = clock.now(); + + let (sql, arguments) = Query::update() + .table(PersonalSessions::Table) + .value(PersonalSessions::RevokedAt, revoked_at) + .and_where( + Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) + .in_subquery( + Query::select() + .expr(Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + ))) + .from(PersonalSessions::Table) + .left_join( + PersonalAccessTokens::Table, + Cond::all() + .add( + Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + )) + .eq(Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::PersonalSessionId, + ))), + ) + .add( + Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::RevokedAt, + )) + .is_null(), + ), + ) + .apply_filter(filter) + .take(), + ), + ) + .build_sqlx(PostgresQueryBuilder); + + let res = sqlx::query_with(&sql, arguments) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(res.rows_affected().try_into().unwrap_or(usize::MAX)) + } + #[tracing::instrument( name = "db.personal_session.list", skip_all, diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 3515b1161..8e208a610 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -87,6 +87,26 @@ pub trait PersonalSessionRepository: Send + Sync { personal_session: PersonalSession, ) -> Result; + /// Revoke all the [`PersonalSession`]s matching the given filter. + /// + /// This will also revoke the relevant personal access tokens. + /// + /// Returns the number of sessions affected + /// + /// # Parameters + /// + /// * `clock`: The clock used to generate timestamps + /// * `filter`: The filter to apply + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + /// List [`PersonalSession`]s matching the given filter and pagination /// parameters /// @@ -150,6 +170,12 @@ repository_impl!(PersonalSessionRepository: personal_session: PersonalSession, ) -> Result; + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + async fn list( &mut self, filter: PersonalSessionFilter<'_>,