mirror of
https://github.com/spacebarchat/server.git
synced 2026-05-11 14:44:50 +00:00
47 lines
2.0 KiB
C#
47 lines
2.0 KiB
C#
using ArcaneLibs.Extensions;
|
|
using RabbitMQ.Client;
|
|
using Spacebar.Interop.Replication.Abstractions;
|
|
|
|
namespace Spacebar.Interop.Replication.RabbitMq;
|
|
|
|
public class RabbitMqSpacebarReplication : ISpacebarReplication {
|
|
private IConnection _mqConnection = null!;
|
|
private IChannel _mqChannel = null!;
|
|
private bool _isInitialised;
|
|
|
|
public async Task InitializeAsync() {
|
|
lock (this) {
|
|
if (_isInitialised) return;
|
|
_isInitialised = true;
|
|
}
|
|
|
|
var factory = new ConnectionFactory {
|
|
Uri = new Uri("amqp://guest:guest@127.0.0.1/")
|
|
};
|
|
_mqConnection = await factory.CreateConnectionAsync();
|
|
_mqChannel = await _mqConnection.CreateChannelAsync();
|
|
}
|
|
|
|
// HACK: body is required in rabbitmq...
|
|
private async Task SendAsyncInternal(ContentlessReplicationMessage message, object? body = null) {
|
|
var exchangeId = (message.GuildId ?? message.ChannelId ?? message.UserId)?.ToString() ?? "global";
|
|
await _mqChannel.ExchangeDeclareAsync(exchange: exchangeId, type: ExchangeType.Fanout, durable: false);
|
|
var props = new BasicProperties() { Type = message.Event };
|
|
var publishSuccess = false;
|
|
var encodedBody = body.ToJson().AsBytes().ToArray(); // TODO: byte array payloads etc someday?
|
|
|
|
do {
|
|
try {
|
|
await _mqChannel.BasicPublishAsync(exchange: exchangeId, routingKey: "", mandatory: true, basicProperties: props, body: encodedBody);
|
|
publishSuccess = true;
|
|
}
|
|
catch (Exception e) {
|
|
Console.WriteLine($"[RabbitMQ] Error publishing {message.Event}: {e.Message}");
|
|
await Task.Delay(10);
|
|
}
|
|
} while (!publishSuccess);
|
|
}
|
|
|
|
public async Task SendAsync(ContentlessReplicationMessage message) => await SendAsyncInternal(message);
|
|
public async Task SendAsync<TPayload>(ReplicationMessage<TPayload> message) => await SendAsyncInternal(message, message.Payload);
|
|
} |