mirror of
https://github.com/element-hq/matrix-authentication-service.git
synced 2026-05-19 03:05:18 +00:00
Add revoke_bulk for personal sessions storage
This commit is contained in:
@@ -181,6 +181,104 @@ mod tests {
|
||||
assert!(session_lookup.is_revoked());
|
||||
}
|
||||
|
||||
#[sqlx::test(migrator = "crate::MIGRATOR")]
|
||||
async fn test_session_revoke_bulk(pool: PgPool) {
|
||||
let mut rng = ChaChaRng::seed_from_u64(42);
|
||||
let clock = MockClock::default();
|
||||
let mut repo = PgRepository::from_pool(&pool).await.unwrap();
|
||||
|
||||
let alice_user = repo
|
||||
.user()
|
||||
.add(&mut rng, &clock, "alice".to_owned())
|
||||
.await
|
||||
.unwrap();
|
||||
let bob_user = repo
|
||||
.user()
|
||||
.add(&mut rng, &clock, "bob".to_owned())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let session1 = repo
|
||||
.personal_session()
|
||||
.add(
|
||||
&mut rng,
|
||||
&clock,
|
||||
(&alice_user).into(),
|
||||
&bob_user,
|
||||
"Test Personal Session".to_owned(),
|
||||
"openid".parse().unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
repo.personal_access_token()
|
||||
.add(
|
||||
&mut rng,
|
||||
&clock,
|
||||
&session1,
|
||||
"mpt_hiss",
|
||||
Some(Duration::days(42)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let session2 = repo
|
||||
.personal_session()
|
||||
.add(
|
||||
&mut rng,
|
||||
&clock,
|
||||
(&bob_user).into(),
|
||||
&bob_user,
|
||||
"Test Personal Session".to_owned(),
|
||||
"openid".parse().unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
repo.personal_access_token()
|
||||
.add(
|
||||
&mut rng, &clock, &session2, "mpt_meow", // No expiry
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Just one session without a token expiry time
|
||||
assert_eq!(
|
||||
repo.personal_session()
|
||||
.revoke_bulk(
|
||||
&clock,
|
||||
PersonalSessionFilter::new()
|
||||
.active_only()
|
||||
.with_expires(false)
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
1
|
||||
);
|
||||
|
||||
// Just one session with a token expiry time
|
||||
assert_eq!(
|
||||
repo.personal_session()
|
||||
.revoke_bulk(
|
||||
&clock,
|
||||
PersonalSessionFilter::new()
|
||||
.active_only()
|
||||
.with_expires(true)
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
1
|
||||
);
|
||||
|
||||
// No active sessions left
|
||||
assert_eq!(
|
||||
repo.personal_session()
|
||||
.revoke_bulk(&clock, PersonalSessionFilter::new().active_only())
|
||||
.await
|
||||
.unwrap(),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrator = "crate::MIGRATOR")]
|
||||
async fn test_access_token_repository(pool: PgPool) {
|
||||
const FIRST_TOKEN: &str = "first_access_token";
|
||||
|
||||
@@ -357,6 +357,68 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
|
||||
.map_err(DatabaseError::to_invalid_operation)
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.personal_session.revoke_bulk",
|
||||
skip_all,
|
||||
fields(
|
||||
db.query.text,
|
||||
),
|
||||
err,
|
||||
)]
|
||||
async fn revoke_bulk(
|
||||
&mut self,
|
||||
clock: &dyn Clock,
|
||||
filter: PersonalSessionFilter<'_>,
|
||||
) -> Result<usize, Self::Error> {
|
||||
let revoked_at = clock.now();
|
||||
|
||||
let (sql, arguments) = Query::update()
|
||||
.table(PersonalSessions::Table)
|
||||
.value(PersonalSessions::RevokedAt, revoked_at)
|
||||
.and_where(
|
||||
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
|
||||
.in_subquery(
|
||||
Query::select()
|
||||
.expr(Expr::col((
|
||||
PersonalSessions::Table,
|
||||
PersonalSessions::PersonalSessionId,
|
||||
)))
|
||||
.from(PersonalSessions::Table)
|
||||
.left_join(
|
||||
PersonalAccessTokens::Table,
|
||||
Cond::all()
|
||||
.add(
|
||||
Expr::col((
|
||||
PersonalSessions::Table,
|
||||
PersonalSessions::PersonalSessionId,
|
||||
))
|
||||
.eq(Expr::col((
|
||||
PersonalAccessTokens::Table,
|
||||
PersonalAccessTokens::PersonalSessionId,
|
||||
))),
|
||||
)
|
||||
.add(
|
||||
Expr::col((
|
||||
PersonalAccessTokens::Table,
|
||||
PersonalAccessTokens::RevokedAt,
|
||||
))
|
||||
.is_null(),
|
||||
),
|
||||
)
|
||||
.apply_filter(filter)
|
||||
.take(),
|
||||
),
|
||||
)
|
||||
.build_sqlx(PostgresQueryBuilder);
|
||||
|
||||
let res = sqlx::query_with(&sql, arguments)
|
||||
.traced()
|
||||
.execute(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "db.personal_session.list",
|
||||
skip_all,
|
||||
|
||||
@@ -87,6 +87,26 @@ pub trait PersonalSessionRepository: Send + Sync {
|
||||
personal_session: PersonalSession,
|
||||
) -> Result<PersonalSession, Self::Error>;
|
||||
|
||||
/// Revoke all the [`PersonalSession`]s matching the given filter.
|
||||
///
|
||||
/// This will also revoke the relevant personal access tokens.
|
||||
///
|
||||
/// Returns the number of sessions affected
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `clock`: The clock used to generate timestamps
|
||||
/// * `filter`: The filter to apply
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Self::Error`] if the underlying repository fails
|
||||
async fn revoke_bulk(
|
||||
&mut self,
|
||||
clock: &dyn Clock,
|
||||
filter: PersonalSessionFilter<'_>,
|
||||
) -> Result<usize, Self::Error>;
|
||||
|
||||
/// List [`PersonalSession`]s matching the given filter and pagination
|
||||
/// parameters
|
||||
///
|
||||
@@ -150,6 +170,12 @@ repository_impl!(PersonalSessionRepository:
|
||||
personal_session: PersonalSession,
|
||||
) -> Result<PersonalSession, Self::Error>;
|
||||
|
||||
async fn revoke_bulk(
|
||||
&mut self,
|
||||
clock: &dyn Clock,
|
||||
filter: PersonalSessionFilter<'_>,
|
||||
) -> Result<usize, Self::Error>;
|
||||
|
||||
async fn list(
|
||||
&mut self,
|
||||
filter: PersonalSessionFilter<'_>,
|
||||
|
||||
Reference in New Issue
Block a user