CS: make replication messages typed, some offload work

This commit is contained in:
Rory&
2026-03-13 06:10:31 +01:00
parent 1e82f2f1f8
commit ad4b342200
16 changed files with 291 additions and 35 deletions
@@ -2,5 +2,6 @@
public interface ISpacebarReplication {
public Task InitializeAsync();
public Task SendAsync(ReplicationMessage message);
public Task SendAsync(ContentlessReplicationMessage message);
public Task SendAsync<TPayload>(ReplicationMessage<TPayload> message);
}
@@ -2,7 +2,7 @@ using System.Text.Json.Serialization;
namespace Spacebar.Interop.Replication.Abstractions;
public class ReplicationMessage {
public class ContentlessReplicationMessage {
[JsonPropertyName("channel_id")]
public string? ChannelId { get; set; }
@@ -11,7 +11,7 @@ public class ReplicationMessage {
[JsonPropertyName("user_id")]
public string? UserId { get; set; }
[JsonPropertyName("session_id")]
public string? SessionId { get; set; }
@@ -24,9 +24,11 @@ public class ReplicationMessage {
[JsonPropertyName("origin")]
public string? Origin { get; set; }
[JsonPropertyName("data")]
public object Payload { get; set; } = null!;
[JsonPropertyName("reconnect_delay")]
public int? ReconnectDelay { get; set; }
}
public class ReplicationMessage<TPayload> : ContentlessReplicationMessage {
[JsonPropertyName("data")]
public TPayload Payload { get; set; } = default!;
}
@@ -22,16 +22,17 @@ public class RabbitMqSpacebarReplication : ISpacebarReplication {
_mqChannel = await _mqConnection.CreateChannelAsync();
}
public async Task SendAsync(ReplicationMessage message) {
// HACK: body is required in rabbitmq...
private async Task SendAsyncInternal(ContentlessReplicationMessage message, object? body = null) {
var exchangeId = message.GuildId ?? message.ChannelId ?? message.UserId ?? "global";
await _mqChannel.ExchangeDeclareAsync(exchange: exchangeId, type: ExchangeType.Fanout, durable: false);
var props = new BasicProperties() { Type = message.Event };
var publishSuccess = false;
var body = message.Payload.ToJson().AsBytes().ToArray(); // TODO: byte array payloads etc someday?
var encodedBody = body.ToJson().AsBytes().ToArray(); // TODO: byte array payloads etc someday?
do {
try {
await _mqChannel.BasicPublishAsync(exchange: exchangeId, routingKey: "", mandatory: true, basicProperties: props, body: body);
await _mqChannel.BasicPublishAsync(exchange: exchangeId, routingKey: "", mandatory: true, basicProperties: props, body: encodedBody);
publishSuccess = true;
}
catch (Exception e) {
@@ -40,4 +41,7 @@ public class RabbitMqSpacebarReplication : ISpacebarReplication {
}
} while (!publishSuccess);
}
public async Task SendAsync(ContentlessReplicationMessage message) => await SendAsyncInternal(message);
public async Task SendAsync<TPayload>(ReplicationMessage<TPayload> message) => await SendAsyncInternal(message, message.Payload);
}
@@ -20,7 +20,7 @@ public class UnixSocketSpacebarReplication(UnixSocketConfiguration conf) : ISpac
};
}
public async Task SendAsync(ReplicationMessage message) {
public async Task SendAsync(ContentlessReplicationMessage message) {
// message format: [uint32be length][payload]
var payload = JsonSerializer.SerializeToUtf8Bytes(message);
byte[] formattedPayload = [..BitConverter.GetBytes(System.Net.IPAddress.HostToNetworkOrder(payload.Length)), ..payload];
@@ -30,6 +30,8 @@ public class UnixSocketSpacebarReplication(UnixSocketConfiguration conf) : ISpac
skv.Value.SendAsync(formattedPayload);
});
}
async Task ISpacebarReplication.SendAsync<TPayload>(ReplicationMessage<TPayload> message) => await SendAsync(message);
}
public class UnixSocketConfiguration {
@@ -0,0 +1,14 @@
using System.Text.Json.Serialization;
namespace Spacebar.Models.Gateway;
public class BulkMessageDeleteResponse {
[JsonPropertyName("guild_id")]
public string? GuildId { get; set; }
[JsonPropertyName("channel_id")]
public required string ChannelId { get; set; }
[JsonPropertyName("ids")]
public required List<string> MessageIds { get; set; }
}
@@ -0,0 +1,64 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
namespace Spacebar.Models.Gateway;
public class ChannelStatusesRequest {
[JsonPropertyName("guild_id")]
[JsonRequired]
public JsonValue GuildIdRawValue { get; set; } = null!;
[JsonIgnore]
public string? GuildId {
get => GuildIdRawValue.GetValueKind() == JsonValueKind.String ? GuildIdRawValue.GetValue<string>() : null;
[MemberNotNull] set => GuildIdRawValue = JsonValue.Create(value!);
}
[JsonIgnore]
public List<string>? GuildIds {
get => GuildIdRawValue.GetValueKind() == JsonValueKind.Array ? GuildIdRawValue.AsArray().Deserialize<List<string>>() : null;
[MemberNotNull] set => GuildIdRawValue = JsonValue.Create(value!)!;
}
}
public class ChannelInfoRequest : ChannelStatusesRequest {
[JsonPropertyName("fields")]
public required List<string> Fields { get; set; }
}
public class ChannelStatus {
[JsonPropertyName("id")]
public string ChannelId { get; set; }
[JsonPropertyName("status")]
public string Status { get; set; }
}
public class ChannelStatusesResponse {
[JsonPropertyName("guild_id")]
public string GuildId { get; set; }
[JsonPropertyName("channels")]
public List<ChannelStatus> Channels { get; set; }
}
public class ChannelInfo {
[JsonPropertyName("id")]
public required string ChannelId { get; set; }
[JsonPropertyName("status")]
public string? Status { get; set; }
[JsonPropertyName("voice_start_time")]
public DateTimeOffset? VoiceStartTime { get; set; }
}
public class ChannelInfoResponse {
[JsonPropertyName("guild_id")]
public string GuildId { get; set; }
[JsonPropertyName("channels")]
public List<ChannelInfo> Channels { get; set; }
}
@@ -6,6 +6,7 @@ using Spacebar.Models.AdminApi;
using Spacebar.Interop.Authentication.AspNetCore;
using Spacebar.Models.Db.Contexts;
using Spacebar.Models.Db.Models;
using Spacebar.Models.Gateway;
namespace Spacebar.AdminApi.Controllers;
@@ -18,15 +19,15 @@ public class ChannelController(
SpacebarAspNetAuthenticationService auth,
ISpacebarReplication replication
) : ControllerBase {
[HttpDelete("{id}")]
public async Task DeleteById(string id) {
(await auth.GetCurrentUserAsync(Request)).GetRights().AssertHasAllRights(SpacebarRights.Rights.OPERATOR);
replication.SendAsync(new() {
// TODO: proper type
await replication.SendAsync<Channel>(new() {
Origin = "AdminApi/DeleteChannelById",
ChannelId = id,
Event = "CHANNEL_DELETE",
Payload = await db.Channels.SingleAsync (x=>x.Id == id)
Payload = await db.Channels.SingleAsync(x => x.Id == id)
});
await db.Channels.Where(x => x.Id == id).ExecuteDeleteAsync();
@@ -58,13 +59,13 @@ public class ChannelController(
break;
}
await replication.SendAsync(new() {
await replication.SendAsync<BulkMessageDeleteResponse>(new() {
ChannelId = channelId,
Event = "MESSAGE_BULK_DELETE",
Payload = new {
ids = messageIds,
channel_id = channelId,
guild_id = guildId,
Payload = new BulkMessageDeleteResponse() {
GuildId = guildId,
ChannelId = channelId,
MessageIds = messageIds,
},
Origin = "Admin API (GuildController.DeleteUser)",
});
@@ -6,6 +6,7 @@ using Spacebar.Models.AdminApi;
using Spacebar.Interop.Authentication.AspNetCore;
using Spacebar.Models.Db.Contexts;
using Spacebar.Models.Db.Models;
using Spacebar.Models.Gateway;
namespace Spacebar.AdminApi.Controllers;
@@ -218,13 +219,13 @@ public class GuildController(
break;
}
await replication.SendAsync(new() {
await replication.SendAsync<BulkMessageDeleteResponse>(new() {
ChannelId = channelId,
Event = "MESSAGE_BULK_DELETE",
Payload = new {
ids = messageIds,
channel_id = channelId,
guild_id = guildId,
Payload = new() {
GuildId = guildId,
ChannelId = channelId,
MessageIds = messageIds,
},
Origin = "Admin API (GuildController.DeleteUser)",
});
@@ -40,7 +40,8 @@ public class IpcTestController(
while (true) {
var clr = re.Next();
color = clr.r << 16 | clr.g << 8 | clr.b;
await replication.SendAsync(new() {
// TODO: create type
await replication.SendAsync<object>(new() {
Event = "GUILD_ROLE_UPDATE",
GuildId = guildId,
Origin = "Admin API (GET /users/test)",
@@ -6,6 +6,7 @@ using Spacebar.Interop.Authentication.AspNetCore;
using Spacebar.Interop.Replication.Abstractions;
using Spacebar.Models.AdminApi;
using Spacebar.Models.Db.Contexts;
using Spacebar.Models.Gateway;
namespace Spacebar.AdminApi.Controllers;
@@ -218,13 +219,13 @@ public class UserController(
break;
}
await replication.SendAsync(new() {
await replication.SendAsync<BulkMessageDeleteResponse>(new() {
Event = "MESSAGE_BULK_DELETE",
ChannelId = channelId,
Payload = new {
channel_id = channelId,
guild_id = guildId,
ids = messageIds,
Payload = new() {
GuildId = guildId,
ChannelId = channelId,
MessageIds = messageIds,
},
Origin = "AdminApi/DeleteMessagesForChannel"
});
@@ -24,6 +24,8 @@
<PackageReference Include="Spacebar.Models.Config" Version="*-preview*" Condition="'$(ContinuousIntegrationBuild)'=='true'"/>
<ProjectReference Include="..\Models\Spacebar.Models.Db\Spacebar.Models.Db.csproj" Condition="'$(ContinuousIntegrationBuild)'!='true'"/>
<PackageReference Include="Spacebar.Models.Db" Version="*-preview*" Condition="'$(ContinuousIntegrationBuild)'=='true'"/>
<ProjectReference Include="..\Models\Spacebar.Models.Gateway\Spacebar.Models.Gateway.csproj" Condition="'$(ContinuousIntegrationBuild)'!='true'"/>
<PackageReference Include="Spacebar.Models.Gateway" Version="*-preview*" Condition="'$(ContinuousIntegrationBuild)'=='true'"/>
</ItemGroup>
<ItemGroup>
@@ -0,0 +1,68 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Spacebar.Interop.Authentication.AspNetCore;
using Spacebar.Interop.Replication.Abstractions;
using Spacebar.Models.Db.Contexts;
using Spacebar.Models.Gateway;
namespace Spacebar.GatewayOffload.Controllers;
[ApiController]
[Route("/_spacebar/offload/gateway")]
public class ChannelStatusController(ILogger<ChannelStatusController> logger, SpacebarAspNetAuthenticationService authService, SpacebarDbContext db, IServiceProvider sp)
: ControllerBase {
[HttpPost("channelStatuses")]
public async IAsyncEnumerable<ReplicationMessage<ChannelStatusesResponse>> GetChannelStatuses([FromBody] ChannelStatusesRequest req) {
await foreach (var res in GetChannelInfos(new() {
Fields = ["status"],
GuildIdRawValue = req.GuildIdRawValue,
})) {
yield return new() {
Payload = new() {
GuildId = res.Payload.GuildId,
Channels = res.Payload.Channels.Select(c => new ChannelStatus {
ChannelId = c.ChannelId,
Status = c.Status!,
}).ToList(),
}
};
}
}
[HttpPost("channelInfo")]
public async IAsyncEnumerable<ReplicationMessage<ChannelInfoResponse>> GetChannelInfos([FromBody] ChannelInfoRequest req) {
var user = await authService.GetCurrentUserAsync(Request);
string[] statusOptions = [
"Vibing ✨",
"Hanging out: 12%...",
"Communicating...",
// idk, i cant come up with more stuff, maybe suggestions welcome, or actually storing some data?
];
foreach (var guildId in req.GuildIds ?? [req.GuildId!]) {
var channels = (await db.Channels.Include(x => x.VoiceStates).Where(x => x.Type == 2 && x.GuildId == guildId && x.VoiceStates.Count > 0)
.Select(x => x.Id)
.ToListAsync())
.Select(x => new {
id = x,
status = statusOptions[new Random().Next(statusOptions.Length)], // TODO: We don't currently store channel statuses, so make some stuff up
voiceStartTime = DateTime.Now.Subtract(TimeSpan.FromMinutes(new Random().Next(1, 120))), // TODO: We also don't store voice start times, so make some stuff up
}).ToList();
yield return new() {
Payload = new() {
GuildId = guildId,
Channels = channels.Select(c => new ChannelInfo {
ChannelId = c.id,
Status = req.Fields.Contains("status") ? c.status : null,
VoiceStartTime = req.Fields.Contains("voice_start_time") ? c.voiceStartTime : null,
}).ToList(),
},
};
}
}
}
@@ -11,7 +11,7 @@ namespace Spacebar.GatewayOffload.Controllers;
[Route("/_spacebar/offload/gateway/Identify")]
public class IdentifyController(ILogger<IdentifyController> logger, SpacebarAuthenticationService authService, SpacebarDbContext db, IServiceProvider sp) : ControllerBase {
[HttpPost("")]
public async IAsyncEnumerable<ReplicationMessage> DoIdentify(IdentifyRequest payload) {
public async IAsyncEnumerable<ContentlessReplicationMessage> DoIdentify(IdentifyRequest payload) {
var user = await TraceResult.TraceAsync("getAuthUser", () => authService.GetCurrentUserAsync(payload.Token));
var session = await TraceResult.TraceAsync("getAuthSession", () => authService.GetCurrentSessionAsync(payload.Token));
@@ -37,12 +37,13 @@ public class IdentifyController(ILogger<IdentifyController> logger, SpacebarAuth
}
}
yield return new() {
Payload = new ReadyResponse { },
yield return new ReplicationMessage<ReadyResponse>() {
Payload = new() { },
};
}
private ReplicationMessage Close(CloseCode closeCode) => new() {
// TODO: type? also, implement this in gateway lol
private ReplicationMessage<object?> Close(CloseCode closeCode) => new() {
Origin = "IdentifyController",
Event = "SB_GW_CLOSE",
Payload = new {
@@ -20,7 +20,7 @@ namespace Spacebar.GatewayOffload.Controllers;
public class Op12Controller(ILogger<Op12Controller> logger, SpacebarAspNetAuthenticationService authService, SpacebarDbContext db, IServiceProvider sp) : ControllerBase
{
[HttpPost("")]
public async IAsyncEnumerable<ReplicationMessage> DoGuildSync(List<string> guildIds)
public async IAsyncEnumerable<ReplicationMessage<GuildSyncResponse>> DoGuildSync(List<string> guildIds)
{
var user = await authService.GetCurrentUserAsync(Request);
guildIds = (await db.Members.AsNoTracking().Where(x => x.Id == user.Id).Select(x => x.GuildId).ToListAsync())
@@ -14,7 +14,8 @@ namespace Spacebar.GatewayOffload.Controllers;
[Route("/_spacebar/offload/gateway/LazyRequest")]
public class Op14Controller(ILogger<Op12Controller> logger, SpacebarAspNetAuthenticationService authService, SpacebarDbContext db, IServiceProvider sp) : ControllerBase {
[HttpPost]
public async IAsyncEnumerable<ReplicationMessage> DoLazyRequest([FromBody] LazyRequest payload) {
// TODO: actually return something?
public async IAsyncEnumerable<ContentlessReplicationMessage> DoLazyRequest([FromBody] LazyRequest payload) {
var user = await TraceResult.TraceAsync("getAuthUser", () => authService.GetCurrentUserAsync(Request));
var session = await TraceResult.TraceAsync("getAuthSession", () => authService.GetCurrentSessionAsync(Request));
@@ -0,0 +1,93 @@
using System.Collections.Frozen;
using System.Linq.Expressions;
using System.Text.Json;
using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Spacebar.DataMappings.Generic;
using Spacebar.Interop.Authentication.AspNetCore;
using Spacebar.Interop.Replication.Abstractions;
using Spacebar.Models.Db.Contexts;
using Spacebar.Models.Db.Models;
using Spacebar.Models.Gateway;
using Spacebar.Models.Generic;
namespace Spacebar.GatewayOffload.Controllers;
[ApiController]
[Route("/_spacebar/offload/gateway/GuildSync")]
public class Op8Controller(ILogger<Op8Controller> logger, SpacebarAspNetAuthenticationService authService, SpacebarDbContext db, IServiceProvider sp) : ControllerBase
{
[HttpPost("")]
public async IAsyncEnumerable<ReplicationMessage<GuildSyncResponse>> DoGuildSync(List<string> guildIds)
{
var user = await authService.GetCurrentUserAsync(Request);
guildIds = (await db.Members.AsNoTracking().Where(x => x.Id == user.Id).Select(x => x.GuildId).ToListAsync())
.Intersect(guildIds)
.OrderByDescending(gi => db.Members.Count(m => m.GuildId == gi))
.ToList();
var syncs = guildIds.Select(GetGuildSyncAsync).ToList().ToAsyncResultEnumerable();
await foreach (var res in syncs)
{
yield return new()
{
Origin = "OFFLOAD_GUILD_SYNC",
UserId = user.Id,
Event = "GUILD_SYNC",
CreatedAt = DateTime.Now,
Payload = res
};
}
}
// TODO: figure out how to abstract this to a function without EFCore complaining about not being translatable...
private static Expression<Func<Session, bool>> IsOnline = (Session session) => session.Status != "offline" && session.Status != "invisible" && session.Status != "unknown";
private async Task<GuildSyncResponse> GetGuildSyncAsync(string guildId)
{
await using var sc = sp.CreateAsyncScope();
var _db = sc.ServiceProvider.GetRequiredService<SpacebarDbContext>();
var memberCount = await _db.Members.AsNoTracking().Where(x => x.GuildId == guildId).CountAsync();
var offlineTreshold = DateTime.Now.Subtract(TimeSpan.FromDays(14));
var isLargeGuild = memberCount > 10000;
var members = await _db.Members.AsNoTracking().Where(x => x.GuildId == guildId)
.Include(x => x.IdNavigation)
.ThenInclude(x => x.Sessions.Where(s =>
!s.IsAdminSession && (
// see TODO on IsOnline - somehow need to replicate `IsOnline(s)`
s.Status != "offline" && s.Status != "invisible" && s.Status != "unknown"
) && (!isLargeGuild || s.LastSeen >= offlineTreshold)))
.Where(x => x.IdNavigation.Sessions.Count > 0) // ignore members without sessions
.ToListAsync();
var mappedPartialUsers = members.Select(x => x.IdNavigation).ToFrozenDictionary(x => x.Id, x => x.ToPartialUser());
var mappedMembers = members.ToFrozenDictionary(m => m.Id, m => m.ToPublicMember(mappedPartialUsers[m.Id]));
var presences = members.Select(x => x.IdNavigation).Where(x => x.Sessions.Count > 0).ToFrozenDictionary(x => x.Id, x =>
{
var sortedSessions = x.Sessions.OrderByDescending(s => s.LastSeen).ToList();
return new Presence()
{
GuildId = guildId,
User = mappedPartialUsers[x.Id],
Activities = x.Sessions.Where(s => s.Status is not ("offline" or "invisible" or "unknown"))
.SelectMany(s => JsonSerializer.Deserialize<JsonObject[]>(s.Activities) ?? []).ToList(),
Status = sortedSessions.FirstOrDefault(s => !string.IsNullOrWhiteSpace(s.Status))?.Status ?? "offline",
ClientStatus = JsonSerializer.Deserialize<Presence.ClientStatuses>(sortedSessions.First(s => !string.IsNullOrWhiteSpace(s.ClientStatus)).ClientStatus) ??
new()
};
}).Where(x => x.Value.Activities.Count > 0).ToFrozenDictionary();
var r = new GuildSyncResponse()
{
GuildId = guildId,
Members = mappedMembers.Values.ToList(),
Presences = presences.Values.ToList()
};
return r;
}
}