Prettier u8

This commit is contained in:
Rory&
2025-12-17 09:23:55 +01:00
parent b843219907
commit 45ee84aca9
13 changed files with 71 additions and 287 deletions

View File

@@ -3,10 +3,7 @@ import { onLazyRequest } from "./LazyRequest";
import { GuildSubscriptionsBulkSchema } from "@spacebar/schemas";
import { check } from "./instanceOf";
export async function onGuildSubscriptionsBulk(
this: WebSocket,
payload: Payload,
) {
export async function onGuildSubscriptionsBulk(this: WebSocket, payload: Payload) {
const startTime = Date.now();
check.call(this, GuildSubscriptionsBulkSchema, payload.d);
const body = payload.d as GuildSubscriptionsBulkSchema;
@@ -22,7 +19,5 @@ export async function onGuildSubscriptionsBulk(
},
});
}
console.log(
`[Gateway] GuildSubscriptionsBulk processed ${Object.keys(body.subscriptions).length} subscriptions for user ${this.user_id} in ${Date.now() - startTime}ms`,
);
console.log(`[Gateway] GuildSubscriptionsBulk processed ${Object.keys(body.subscriptions).length} subscriptions for user ${this.user_id} in ${Date.now() - startTime}ms`);
}

View File

@@ -16,18 +16,11 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {
getDatabase,
getPermission,
GuildMembersChunkEvent,
Member,
Presence,
Session,
} from "@spacebar/util";
import { getDatabase, getPermission, GuildMembersChunkEvent, Member, Presence, Session } from "@spacebar/util";
import { WebSocket, Payload, OPCODES, Send } from "@spacebar/gateway";
import { check } from "./instanceOf";
import { FindManyOptions, ILike, In } from "typeorm";
import { RequestGuildMembersSchema } from "@spacebar/schemas"
import { RequestGuildMembersSchema } from "@spacebar/schemas";
export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
const startTime = Date.now();
@@ -39,11 +32,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
check.call(this, RequestGuildMembersSchema, d);
const {
presences,
nonce,
query: requestQuery,
} = d as RequestGuildMembersSchema;
const { presences, nonce, query: requestQuery } = d as RequestGuildMembersSchema;
let { limit, user_ids, guild_id } = d as RequestGuildMembersSchema;
// some discord libraries send empty string as query when they meant to send undefined, which was leading to errors being thrown in this handler
@@ -63,8 +52,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
}
// TODO: Configurable limit?
if ((query || (user_ids && user_ids.length > 0)) && (!limit || limit > 100))
limit = 100;
if ((query || (user_ids && user_ids.length > 0)) && (!limit || limit > 100)) limit = 100;
const permissions = await getPermission(this.user_id, guild_id);
permissions.hasThrow("VIEW_CHANNEL");
@@ -107,10 +95,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
.leftJoinAndSelect("member.roles", "role")
.leftJoinAndSelect("member.user", "user")
.leftJoinAndSelect("user.sessions", "session")
.andWhere(
"',' || member.roles || ',' NOT LIKE :everyoneRoleIdList",
{ everyoneRoleIdList: "%," + guild_id + ",%" },
)
.andWhere("',' || member.roles || ',' NOT LIKE :everyoneRoleIdList", { everyoneRoleIdList: "%," + guild_id + ",%" })
.addOrderBy("user.username", "ASC")
.limit(memberFind.take);
@@ -145,10 +130,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
const chunkCount = Math.ceil(members.length / 1000);
let notFound: string[] = [];
if (user_ids && user_ids.length > 0)
notFound = user_ids.filter(
(id) => !members.some((member) => member.id == id),
);
if (user_ids && user_ids.length > 0) notFound = user_ids.filter((id) => !members.some((member) => member.id == id));
const chunks: GuildMembersChunkEvent["data"][] = [];
while (members.length > 0) {
@@ -202,7 +184,5 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) {
});
});
console.log(
`[Gateway] REQUEST_GUILD_MEMBERS took ${Date.now() - startTime}ms for guild ${guild_id} with ${members.length} members`,
);
console.log(`[Gateway] REQUEST_GUILD_MEMBERS took ${Date.now() - startTime}ms for guild ${guild_id} with ${members.length} members`);
}

View File

@@ -1,9 +1,4 @@
import {
genVoiceToken,
Payload,
WebSocket,
generateStreamKey,
} from "@spacebar/gateway";
import { genVoiceToken, Payload, WebSocket, generateStreamKey } from "@spacebar/gateway";
import {
Channel,
Config,
@@ -18,7 +13,7 @@ import {
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf";
import { StreamCreateSchema } from "@spacebar/schemas"
import { StreamCreateSchema } from "@spacebar/schemas";
export async function onStreamCreate(this: WebSocket, data: Payload) {
const startTime = Date.now();
@@ -47,17 +42,11 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
where: { id: body.channel_id },
});
if (
!channel ||
(body.type === "guild" && channel.guild_id != body.guild_id)
)
return this.close(4000, "invalid channel");
if (!channel || (body.type === "guild" && channel.guild_id != body.guild_id)) return this.close(4000, "invalid channel");
// TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions;
const guildRegion = regions.available.filter(
(r) => r.id === regions.default,
)[0];
const guildRegion = regions.available.filter((r) => r.id === regions.default)[0];
// first make sure theres no other streams for this user that somehow didnt get cleared
await Stream.delete({
@@ -85,12 +74,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
await streamSession.save();
const streamKey = generateStreamKey(
body.type,
body.guild_id,
body.channel_id,
this.user_id,
);
const streamKey = generateStreamKey(body.type, body.guild_id, body.channel_id, this.user_id);
await emitEvent({
event: "STREAM_CREATE",
@@ -125,9 +109,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) {
channel_id: voiceState.channel_id,
} as VoiceStateUpdateEvent);
console.log(
`[Gateway] STREAM_CREATE for user ${this.user_id} in channel ${body.channel_id} with stream key ${streamKey} in ${Date.now() - startTime}ms`,
);
console.log(`[Gateway] STREAM_CREATE for user ${this.user_id} in channel ${body.channel_id} with stream key ${streamKey} in ${Date.now() - startTime}ms`);
}
//stream key:

View File

@@ -1,13 +1,7 @@
import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import {
emitEvent,
Stream,
StreamDeleteEvent,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { emitEvent, Stream, StreamDeleteEvent, VoiceState, VoiceStateUpdateEvent } from "@spacebar/util";
import { check } from "./instanceOf";
import { StreamDeleteSchema } from "@spacebar/schemas"
import { StreamDeleteSchema } from "@spacebar/schemas";
export async function onStreamDelete(this: WebSocket, data: Payload) {
const startTime = Date.now();
@@ -75,7 +69,5 @@ export async function onStreamDelete(this: WebSocket, data: Payload) {
channel_id: channelId,
} as StreamDeleteEvent);
console.log(
`[Gateway] STREAM_DELETE for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`,
);
console.log(`[Gateway] STREAM_DELETE for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`);
}

View File

@@ -1,20 +1,8 @@
import {
genVoiceToken,
parseStreamKey,
Payload,
WebSocket,
} from "@spacebar/gateway";
import {
Config,
emitEvent,
Stream,
StreamCreateEvent,
StreamServerUpdateEvent,
StreamSession,
} from "@spacebar/util";
import { genVoiceToken, parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import { Config, emitEvent, Stream, StreamCreateEvent, StreamServerUpdateEvent, StreamSession } from "@spacebar/util";
import { check } from "./instanceOf";
import { Not } from "typeorm";
import { StreamWatchSchema } from "@spacebar/schemas"
import { StreamWatchSchema } from "@spacebar/schemas";
export async function onStreamWatch(this: WebSocket, data: Payload) {
const startTime = Date.now();
@@ -45,13 +33,10 @@ export async function onStreamWatch(this: WebSocket, data: Payload) {
if (!stream) return this.close(4000, "Invalid stream key");
if (type === "guild" && stream.channel.guild_id != guildId)
return this.close(4000, "Invalid stream key");
if (type === "guild" && stream.channel.guild_id != guildId) return this.close(4000, "Invalid stream key");
const regions = Config.get().regions;
const guildRegion = regions.available.find(
(r) => r.endpoint === stream.endpoint,
);
const guildRegion = regions.available.find((r) => r.endpoint === stream.endpoint);
if (!guildRegion) return this.close(4000, "Unknown region");
@@ -97,7 +82,5 @@ export async function onStreamWatch(this: WebSocket, data: Payload) {
user_id: this.user_id,
} as StreamServerUpdateEvent);
console.log(
`[Gateway] STREAM_WATCH for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`,
);
console.log(`[Gateway] STREAM_WATCH for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`);
}

View File

@@ -17,19 +17,10 @@
*/
import { Payload, WebSocket } from "@spacebar/gateway";
import {
Config,
emitEvent,
Guild,
Member,
Region,
VoiceServerUpdateEvent,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { Config, emitEvent, Guild, Member, Region, VoiceServerUpdateEvent, VoiceState, VoiceStateUpdateEvent } from "@spacebar/util";
import { genVoiceToken } from "../util/SessionUtils";
import { check } from "./instanceOf";
import { VoiceStateUpdateSchema } from "@spacebar/schemas"
import { VoiceStateUpdateSchema } from "@spacebar/schemas";
// TODO: check if a voice server is setup
// Notice: Bot users respect the voice channel's user limit, if set.
@@ -50,10 +41,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
voiceState = await VoiceState.findOneOrFail({
where: { user_id: this.user_id },
});
if (
voiceState.session_id !== this.session_id &&
body.channel_id === null
) {
if (voiceState.session_id !== this.session_id && body.channel_id === null) {
//Should we also check guild_id === null?
//changing deaf or mute on a client that's not the one with the same session of the voicestate in the database should be ignored
return;
@@ -62,11 +50,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
if (voiceState.channel_id !== body.channel_id) isChanged = true;
//If a user change voice channel between guild we should send a left event first
if (
voiceState.guild_id &&
voiceState.guild_id !== body.guild_id &&
voiceState.session_id === this.session_id
) {
if (voiceState.guild_id && voiceState.guild_id !== body.guild_id && voiceState.session_id === this.session_id) {
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: { ...voiceState.toPublicVoiceState(), channel_id: null },
@@ -89,12 +73,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
}
// if user left voice channel, send an update to previous channel/guild to let other people know that the user left
if (
voiceState.session_id === this.session_id &&
body.guild_id == null &&
body.channel_id == null &&
(prevState?.guild_id || prevState?.channel_id)
) {
if (voiceState.session_id === this.session_id && body.guild_id == null && body.channel_id == null && (prevState?.guild_id || prevState?.channel_id)) {
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: {
@@ -118,8 +97,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
}
//If the session changed we generate a new token
if (voiceState.session_id !== this.session_id)
voiceState.token = genVoiceToken();
if (voiceState.session_id !== this.session_id) voiceState.token = genVoiceToken();
voiceState.session_id = this.session_id;
const { member } = voiceState;
@@ -146,13 +124,9 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
const regions = Config.get().regions;
let guildRegion: Region;
if (guild && guild.region) {
guildRegion = regions.available.filter(
(r) => r.id === guild.region,
)[0];
guildRegion = regions.available.filter((r) => r.id === guild.region)[0];
} else {
guildRegion = regions.available.filter(
(r) => r.id === regions.default,
)[0];
guildRegion = regions.available.filter((r) => r.id === regions.default)[0];
}
await emitEvent({
@@ -161,15 +135,11 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
token: voiceState.token,
guild_id: voiceState.guild_id,
endpoint: guildRegion.endpoint,
channel_id: voiceState.guild_id
? undefined
: voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null
channel_id: voiceState.guild_id ? undefined : voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null
},
user_id: voiceState.user_id,
} as VoiceServerUpdateEvent);
}
console.log(
`[Gateway] VOICE_STATE_UPDATE for user ${this.user_id} in channel ${voiceState.channel_id} in guild ${voiceState.guild_id} in ${Date.now() - startTime}ms`,
);
console.log(`[Gateway] VOICE_STATE_UPDATE for user ${this.user_id} in channel ${voiceState.channel_id} in guild ${voiceState.guild_id} in ${Date.now() - startTime}ms`);
}

View File

@@ -21,13 +21,7 @@ import { closeDatabase, Config, initDatabase, initEvent } from "@spacebar/util";
import http from "http";
import ws from "ws";
import { Connection } from "./events/Connection";
import {
loadWebRtcLibrary,
mediaServer,
WRTC_PORT_MAX,
WRTC_PORT_MIN,
WRTC_PUBLIC_IP,
} from "./util/MediaServer";
import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util/MediaServer";
import { green, yellow } from "picocolors";
export class Server {
@@ -36,15 +30,7 @@ export class Server {
public server: http.Server;
public production: boolean;
constructor({
port,
server,
production,
}: {
port: number;
server?: http.Server;
production?: boolean;
}) {
constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) {
this.port = port;
this.production = production || false;

View File

@@ -28,11 +28,7 @@ import { onMessage } from "./Message";
// TODO: specify rate limit in config
// TODO: check msg max size
export async function Connection(
this: WS.Server,
socket: WebRtcWebSocket,
request: IncomingMessage,
) {
export async function Connection(this: WS.Server, socket: WebRtcWebSocket, request: IncomingMessage) {
try {
socket.on("close", onClose.bind(socket));
socket.on("message", onMessage.bind(socket));
@@ -57,8 +53,7 @@ export async function Connection(
socket.encoding = "json";
socket.version = Number(searchParams.get("v")) || 5;
if (socket.version < 3)
return socket.close(CLOSECODES.Unknown_error, "invalid version");
if (socket.version < 3) return socket.close(CLOSECODES.Unknown_error, "invalid version");
setHeartbeat(socket);

View File

@@ -31,8 +31,7 @@ const PayloadSchema = {
export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) {
try {
const data: VoicePayload = JSON.parse(buffer.toString());
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id)
return this.close(CLOSECODES.Not_authenticated);
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated);
const OPCodeHandler = OPCodeHandlers[data.op];
if (!OPCodeHandler) {
@@ -42,11 +41,7 @@ export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) {
return;
}
if (
![VoiceOPCodes.HEARTBEAT, VoiceOPCodes.SPEAKING].includes(
data.op as VoiceOPCodes,
)
) {
if (![VoiceOPCodes.HEARTBEAT, VoiceOPCodes.SPEAKING].includes(data.op as VoiceOPCodes)) {
console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]);
}

View File

@@ -17,29 +17,15 @@
*/
import { CLOSECODES } from "@spacebar/gateway";
import {
StreamSession,
VoiceState,
} from "@spacebar/util";
import {
validateSchema,
VoiceIdentifySchema,
} from "@spacebar/schemas";
import {
generateSsrc,
mediaServer,
Send,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
} from "@spacebar/webrtc";
import { StreamSession, VoiceState } from "@spacebar/util";
import { validateSchema, VoiceIdentifySchema } from "@spacebar/schemas";
import { generateSsrc, mediaServer, Send, VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "@spacebar/webrtc";
import { SSRCs } from "@spacebarchat/spacebar-webrtc-types";
import { subscribeToProducers } from "./Video";
export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
clearTimeout(this.readyTimeout);
const { server_id, user_id, session_id, token, streams, video } =
validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema;
const { server_id, user_id, session_id, token, streams, video } = validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema;
// server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel
// not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db
@@ -92,12 +78,7 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
this.type = type;
const voiceRoomId = type === "stream" ? server_id : voiceState!.channel_id;
this.webRtcClient = await mediaServer.join(
voiceRoomId,
this.user_id,
this,
type!,
);
this.webRtcClient = await mediaServer.join(voiceRoomId, this.user_id, this, type!);
this.on("close", () => {
// ice-lite media server relies on this to know when the peer went away

View File

@@ -16,34 +16,17 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { SelectProtocolSchema, validateSchema } from "@spacebar/schemas";
import {
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
mediaServer,
Send,
} from "@spacebar/webrtc";
import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, mediaServer, Send } from "@spacebar/webrtc";
export async function onSelectProtocol(
this: WebRtcWebSocket,
payload: VoicePayload,
) {
export async function onSelectProtocol(this: WebRtcWebSocket, payload: VoicePayload) {
if (!this.webRtcClient) return;
const data = validateSchema(
"SelectProtocolSchema",
payload.d,
) as SelectProtocolSchema;
const data = validateSchema("SelectProtocolSchema", payload.d) as SelectProtocolSchema;
// UDP protocol not currently supported. Maybe in the future?
if (data.protocol !== "webrtc")
return this.close(4000, "only webrtc protocol supported currently");
if (data.protocol !== "webrtc") return this.close(4000, "only webrtc protocol supported currently");
const response = await mediaServer.onOffer(
this.webRtcClient,
data.sdp!,
data.codecs ?? [],
);
const response = await mediaServer.onOffer(this.webRtcClient, data.sdp!, data.codecs ?? []);
await Send(this, {
op: VoiceOPCodes.SESSION_DESCRIPTION,

View File

@@ -16,13 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
} from "../util";
import { mediaServer, VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727}
@@ -30,11 +24,7 @@ export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) {
if (!this.webRtcClient) return;
await Promise.all(
Array.from(
mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.voiceRoomId,
),
).map((client) => {
Array.from(mediaServer.getClientsForRtcServer<WebRtcWebSocket>(this.webRtcClient.voiceRoomId)).map((client) => {
if (client.user_id === this.user_id) return Promise.resolve();
const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id);

View File

@@ -16,13 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Stream } from "@spacebar/util";
import {
mediaServer,
Send,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
} from "@spacebar/webrtc";
import { mediaServer, Send, VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "@spacebar/webrtc";
import type { WebRtcClient } from "@spacebarchat/spacebar-webrtc-types";
import { validateSchema, VoiceVideoSchema } from "@spacebar/schemas";
@@ -60,9 +54,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
try {
await Promise.race([
new Promise<void>((resolve, reject) => {
this.webRtcClient?.emitter.once("connected", () =>
resolve(),
);
this.webRtcClient?.emitter.once("connected", () => resolve());
}),
new Promise<void>((resolve, reject) => {
// Reject after 3 seconds if still not connected
@@ -93,28 +85,19 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
if (wantsToProduceAudio) {
// check if we are already producing audio, if not, publish a new audio track for it
if (!this.webRtcClient!.isProducingAudio()) {
console.log(
`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`,
);
console.log(`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`);
await this.webRtcClient.publishTrack("audio", {
audio_ssrc: d.audio_ssrc,
});
}
// now check that all clients have subscribed to our audio
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
voiceRoomId,
)) {
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(voiceRoomId)) {
if (client.user_id === this.user_id) continue;
if (!client.isSubscribedToTrack(this.user_id, "audio")) {
console.log(
`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`,
);
await client.subscribeToTrack(
this.webRtcClient.user_id,
"audio",
);
console.log(`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`);
await client.subscribeToTrack(this.webRtcClient.user_id, "audio");
clientsThatNeedUpdate.add(client);
}
@@ -125,9 +108,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response
// check if we are already publishing video, if not, publish a new video track for it
if (!this.webRtcClient!.isProducingVideo()) {
console.log(
`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`,
);
console.log(`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`);
await this.webRtcClient.publishTrack("video", {
video_ssrc: d.video_ssrc,
rtx_ssrc: d.rtx_ssrc,
@@ -135,19 +116,12 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
}
// now check that all clients have subscribed to our video track
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
voiceRoomId,
)) {
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(voiceRoomId)) {
if (client.user_id === this.user_id) continue;
if (!client.isSubscribedToTrack(this.user_id, "video")) {
console.log(
`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`,
);
await client.subscribeToTrack(
this.webRtcClient.user_id,
"video",
);
console.log(`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`);
await client.subscribeToTrack(this.webRtcClient.user_id, "video");
clientsThatNeedUpdate.add(client);
}
@@ -163,9 +137,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
d: {
user_id: this.user_id,
// can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup
audio_ssrc:
ssrcs.audio_ssrc ??
this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc,
audio_ssrc: ssrcs.audio_ssrc ?? this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc,
video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: d.streams?.map((x) => ({
@@ -181,14 +153,10 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
}
// check if we are not subscribed to producers in this server, if not, subscribe
export async function subscribeToProducers(
this: WebRtcWebSocket,
): Promise<void> {
export async function subscribeToProducers(this: WebRtcWebSocket): Promise<void> {
if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return;
const clients = mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.voiceRoomId,
);
const clients = mediaServer.getClientsForRtcServer<WebRtcWebSocket>(this.webRtcClient.voiceRoomId);
await Promise.all(
Array.from(clients).map(async (client) => {
@@ -196,42 +164,26 @@ export async function subscribeToProducers(
if (client.user_id === this.user_id) return; // cannot subscribe to self
if (
client.isProducingAudio() &&
!this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio")
) {
await this.webRtcClient!.subscribeToTrack(
client.user_id,
"audio",
);
if (client.isProducingAudio() && !this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio")) {
await this.webRtcClient!.subscribeToTrack(client.user_id, "audio");
needsUpdate = true;
}
if (
client.isProducingVideo() &&
!this.webRtcClient!.isSubscribedToTrack(client.user_id, "video")
) {
await this.webRtcClient!.subscribeToTrack(
client.user_id,
"video",
);
if (client.isProducingVideo() && !this.webRtcClient!.isSubscribedToTrack(client.user_id, "video")) {
await this.webRtcClient!.subscribeToTrack(client.user_id, "video");
needsUpdate = true;
}
if (!needsUpdate) return;
const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser(
client.user_id,
);
const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser(client.user_id);
await Send(this, {
op: VoiceOPCodes.VIDEO,
d: {
user_id: client.user_id,
// can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup
audio_ssrc:
ssrcs.audio_ssrc ??
client.getIncomingStreamSSRCs().audio_ssrc,
audio_ssrc: ssrcs.audio_ssrc ?? client.getIncomingStreamSSRCs().audio_ssrc,
video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: [