diff --git a/src/gateway/opcodes/GuildSubscriptionsBulk.ts b/src/gateway/opcodes/GuildSubscriptionsBulk.ts index 54fe78e24..3c1bdbd5b 100644 --- a/src/gateway/opcodes/GuildSubscriptionsBulk.ts +++ b/src/gateway/opcodes/GuildSubscriptionsBulk.ts @@ -3,10 +3,7 @@ import { onLazyRequest } from "./LazyRequest"; import { GuildSubscriptionsBulkSchema } from "@spacebar/schemas"; import { check } from "./instanceOf"; -export async function onGuildSubscriptionsBulk( - this: WebSocket, - payload: Payload, -) { +export async function onGuildSubscriptionsBulk(this: WebSocket, payload: Payload) { const startTime = Date.now(); check.call(this, GuildSubscriptionsBulkSchema, payload.d); const body = payload.d as GuildSubscriptionsBulkSchema; @@ -22,7 +19,5 @@ export async function onGuildSubscriptionsBulk( }, }); } - console.log( - `[Gateway] GuildSubscriptionsBulk processed ${Object.keys(body.subscriptions).length} subscriptions for user ${this.user_id} in ${Date.now() - startTime}ms`, - ); + console.log(`[Gateway] GuildSubscriptionsBulk processed ${Object.keys(body.subscriptions).length} subscriptions for user ${this.user_id} in ${Date.now() - startTime}ms`); } diff --git a/src/gateway/opcodes/RequestGuildMembers.ts b/src/gateway/opcodes/RequestGuildMembers.ts index 034dce6b2..9807e3de4 100644 --- a/src/gateway/opcodes/RequestGuildMembers.ts +++ b/src/gateway/opcodes/RequestGuildMembers.ts @@ -16,18 +16,11 @@ along with this program. If not, see . */ -import { - getDatabase, - getPermission, - GuildMembersChunkEvent, - Member, - Presence, - Session, -} from "@spacebar/util"; +import { getDatabase, getPermission, GuildMembersChunkEvent, Member, Presence, Session } from "@spacebar/util"; import { WebSocket, Payload, OPCODES, Send } from "@spacebar/gateway"; import { check } from "./instanceOf"; import { FindManyOptions, ILike, In } from "typeorm"; -import { RequestGuildMembersSchema } from "@spacebar/schemas" +import { RequestGuildMembersSchema } from "@spacebar/schemas"; export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { const startTime = Date.now(); @@ -39,11 +32,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { check.call(this, RequestGuildMembersSchema, d); - const { - presences, - nonce, - query: requestQuery, - } = d as RequestGuildMembersSchema; + const { presences, nonce, query: requestQuery } = d as RequestGuildMembersSchema; let { limit, user_ids, guild_id } = d as RequestGuildMembersSchema; // some discord libraries send empty string as query when they meant to send undefined, which was leading to errors being thrown in this handler @@ -63,8 +52,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { } // TODO: Configurable limit? - if ((query || (user_ids && user_ids.length > 0)) && (!limit || limit > 100)) - limit = 100; + if ((query || (user_ids && user_ids.length > 0)) && (!limit || limit > 100)) limit = 100; const permissions = await getPermission(this.user_id, guild_id); permissions.hasThrow("VIEW_CHANNEL"); @@ -107,10 +95,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { .leftJoinAndSelect("member.roles", "role") .leftJoinAndSelect("member.user", "user") .leftJoinAndSelect("user.sessions", "session") - .andWhere( - "',' || member.roles || ',' NOT LIKE :everyoneRoleIdList", - { everyoneRoleIdList: "%," + guild_id + ",%" }, - ) + .andWhere("',' || member.roles || ',' NOT LIKE :everyoneRoleIdList", { everyoneRoleIdList: "%," + guild_id + ",%" }) .addOrderBy("user.username", "ASC") .limit(memberFind.take); @@ -145,10 +130,7 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { const chunkCount = Math.ceil(members.length / 1000); let notFound: string[] = []; - if (user_ids && user_ids.length > 0) - notFound = user_ids.filter( - (id) => !members.some((member) => member.id == id), - ); + if (user_ids && user_ids.length > 0) notFound = user_ids.filter((id) => !members.some((member) => member.id == id)); const chunks: GuildMembersChunkEvent["data"][] = []; while (members.length > 0) { @@ -202,7 +184,5 @@ export async function onRequestGuildMembers(this: WebSocket, { d }: Payload) { }); }); - console.log( - `[Gateway] REQUEST_GUILD_MEMBERS took ${Date.now() - startTime}ms for guild ${guild_id} with ${members.length} members`, - ); + console.log(`[Gateway] REQUEST_GUILD_MEMBERS took ${Date.now() - startTime}ms for guild ${guild_id} with ${members.length} members`); } diff --git a/src/gateway/opcodes/StreamCreate.ts b/src/gateway/opcodes/StreamCreate.ts index 3defc1232..00c8c947c 100644 --- a/src/gateway/opcodes/StreamCreate.ts +++ b/src/gateway/opcodes/StreamCreate.ts @@ -1,9 +1,4 @@ -import { - genVoiceToken, - Payload, - WebSocket, - generateStreamKey, -} from "@spacebar/gateway"; +import { genVoiceToken, Payload, WebSocket, generateStreamKey } from "@spacebar/gateway"; import { Channel, Config, @@ -18,7 +13,7 @@ import { VoiceStateUpdateEvent, } from "@spacebar/util"; import { check } from "./instanceOf"; -import { StreamCreateSchema } from "@spacebar/schemas" +import { StreamCreateSchema } from "@spacebar/schemas"; export async function onStreamCreate(this: WebSocket, data: Payload) { const startTime = Date.now(); @@ -47,17 +42,11 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { where: { id: body.channel_id }, }); - if ( - !channel || - (body.type === "guild" && channel.guild_id != body.guild_id) - ) - return this.close(4000, "invalid channel"); + if (!channel || (body.type === "guild" && channel.guild_id != body.guild_id)) return this.close(4000, "invalid channel"); // TODO: actually apply preferred_region from the event payload const regions = Config.get().regions; - const guildRegion = regions.available.filter( - (r) => r.id === regions.default, - )[0]; + const guildRegion = regions.available.filter((r) => r.id === regions.default)[0]; // first make sure theres no other streams for this user that somehow didnt get cleared await Stream.delete({ @@ -85,12 +74,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { await streamSession.save(); - const streamKey = generateStreamKey( - body.type, - body.guild_id, - body.channel_id, - this.user_id, - ); + const streamKey = generateStreamKey(body.type, body.guild_id, body.channel_id, this.user_id); await emitEvent({ event: "STREAM_CREATE", @@ -125,9 +109,7 @@ export async function onStreamCreate(this: WebSocket, data: Payload) { channel_id: voiceState.channel_id, } as VoiceStateUpdateEvent); - console.log( - `[Gateway] STREAM_CREATE for user ${this.user_id} in channel ${body.channel_id} with stream key ${streamKey} in ${Date.now() - startTime}ms`, - ); + console.log(`[Gateway] STREAM_CREATE for user ${this.user_id} in channel ${body.channel_id} with stream key ${streamKey} in ${Date.now() - startTime}ms`); } //stream key: diff --git a/src/gateway/opcodes/StreamDelete.ts b/src/gateway/opcodes/StreamDelete.ts index 069d6b2a8..34a3b9df1 100644 --- a/src/gateway/opcodes/StreamDelete.ts +++ b/src/gateway/opcodes/StreamDelete.ts @@ -1,13 +1,7 @@ import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; -import { - emitEvent, - Stream, - StreamDeleteEvent, - VoiceState, - VoiceStateUpdateEvent, -} from "@spacebar/util"; +import { emitEvent, Stream, StreamDeleteEvent, VoiceState, VoiceStateUpdateEvent } from "@spacebar/util"; import { check } from "./instanceOf"; -import { StreamDeleteSchema } from "@spacebar/schemas" +import { StreamDeleteSchema } from "@spacebar/schemas"; export async function onStreamDelete(this: WebSocket, data: Payload) { const startTime = Date.now(); @@ -75,7 +69,5 @@ export async function onStreamDelete(this: WebSocket, data: Payload) { channel_id: channelId, } as StreamDeleteEvent); - console.log( - `[Gateway] STREAM_DELETE for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`, - ); + console.log(`[Gateway] STREAM_DELETE for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`); } diff --git a/src/gateway/opcodes/StreamWatch.ts b/src/gateway/opcodes/StreamWatch.ts index 88bc0463b..3bb1ce779 100644 --- a/src/gateway/opcodes/StreamWatch.ts +++ b/src/gateway/opcodes/StreamWatch.ts @@ -1,20 +1,8 @@ -import { - genVoiceToken, - parseStreamKey, - Payload, - WebSocket, -} from "@spacebar/gateway"; -import { - Config, - emitEvent, - Stream, - StreamCreateEvent, - StreamServerUpdateEvent, - StreamSession, -} from "@spacebar/util"; +import { genVoiceToken, parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; +import { Config, emitEvent, Stream, StreamCreateEvent, StreamServerUpdateEvent, StreamSession } from "@spacebar/util"; import { check } from "./instanceOf"; import { Not } from "typeorm"; -import { StreamWatchSchema } from "@spacebar/schemas" +import { StreamWatchSchema } from "@spacebar/schemas"; export async function onStreamWatch(this: WebSocket, data: Payload) { const startTime = Date.now(); @@ -45,13 +33,10 @@ export async function onStreamWatch(this: WebSocket, data: Payload) { if (!stream) return this.close(4000, "Invalid stream key"); - if (type === "guild" && stream.channel.guild_id != guildId) - return this.close(4000, "Invalid stream key"); + if (type === "guild" && stream.channel.guild_id != guildId) return this.close(4000, "Invalid stream key"); const regions = Config.get().regions; - const guildRegion = regions.available.find( - (r) => r.endpoint === stream.endpoint, - ); + const guildRegion = regions.available.find((r) => r.endpoint === stream.endpoint); if (!guildRegion) return this.close(4000, "Unknown region"); @@ -97,7 +82,5 @@ export async function onStreamWatch(this: WebSocket, data: Payload) { user_id: this.user_id, } as StreamServerUpdateEvent); - console.log( - `[Gateway] STREAM_WATCH for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`, - ); + console.log(`[Gateway] STREAM_WATCH for user ${this.user_id} in channel ${channelId} with stream key ${body.stream_key} in ${Date.now() - startTime}ms`); } diff --git a/src/gateway/opcodes/VoiceStateUpdate.ts b/src/gateway/opcodes/VoiceStateUpdate.ts index 7d1a80e9e..a2922e0ce 100644 --- a/src/gateway/opcodes/VoiceStateUpdate.ts +++ b/src/gateway/opcodes/VoiceStateUpdate.ts @@ -17,19 +17,10 @@ */ import { Payload, WebSocket } from "@spacebar/gateway"; -import { - Config, - emitEvent, - Guild, - Member, - Region, - VoiceServerUpdateEvent, - VoiceState, - VoiceStateUpdateEvent, -} from "@spacebar/util"; +import { Config, emitEvent, Guild, Member, Region, VoiceServerUpdateEvent, VoiceState, VoiceStateUpdateEvent } from "@spacebar/util"; import { genVoiceToken } from "../util/SessionUtils"; import { check } from "./instanceOf"; -import { VoiceStateUpdateSchema } from "@spacebar/schemas" +import { VoiceStateUpdateSchema } from "@spacebar/schemas"; // TODO: check if a voice server is setup // Notice: Bot users respect the voice channel's user limit, if set. @@ -50,10 +41,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { voiceState = await VoiceState.findOneOrFail({ where: { user_id: this.user_id }, }); - if ( - voiceState.session_id !== this.session_id && - body.channel_id === null - ) { + if (voiceState.session_id !== this.session_id && body.channel_id === null) { //Should we also check guild_id === null? //changing deaf or mute on a client that's not the one with the same session of the voicestate in the database should be ignored return; @@ -62,11 +50,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { if (voiceState.channel_id !== body.channel_id) isChanged = true; //If a user change voice channel between guild we should send a left event first - if ( - voiceState.guild_id && - voiceState.guild_id !== body.guild_id && - voiceState.session_id === this.session_id - ) { + if (voiceState.guild_id && voiceState.guild_id !== body.guild_id && voiceState.session_id === this.session_id) { await emitEvent({ event: "VOICE_STATE_UPDATE", data: { ...voiceState.toPublicVoiceState(), channel_id: null }, @@ -89,12 +73,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { } // if user left voice channel, send an update to previous channel/guild to let other people know that the user left - if ( - voiceState.session_id === this.session_id && - body.guild_id == null && - body.channel_id == null && - (prevState?.guild_id || prevState?.channel_id) - ) { + if (voiceState.session_id === this.session_id && body.guild_id == null && body.channel_id == null && (prevState?.guild_id || prevState?.channel_id)) { await emitEvent({ event: "VOICE_STATE_UPDATE", data: { @@ -118,8 +97,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { } //If the session changed we generate a new token - if (voiceState.session_id !== this.session_id) - voiceState.token = genVoiceToken(); + if (voiceState.session_id !== this.session_id) voiceState.token = genVoiceToken(); voiceState.session_id = this.session_id; const { member } = voiceState; @@ -146,13 +124,9 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { const regions = Config.get().regions; let guildRegion: Region; if (guild && guild.region) { - guildRegion = regions.available.filter( - (r) => r.id === guild.region, - )[0]; + guildRegion = regions.available.filter((r) => r.id === guild.region)[0]; } else { - guildRegion = regions.available.filter( - (r) => r.id === regions.default, - )[0]; + guildRegion = regions.available.filter((r) => r.id === regions.default)[0]; } await emitEvent({ @@ -161,15 +135,11 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { token: voiceState.token, guild_id: voiceState.guild_id, endpoint: guildRegion.endpoint, - channel_id: voiceState.guild_id - ? undefined - : voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null + channel_id: voiceState.guild_id ? undefined : voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null }, user_id: voiceState.user_id, } as VoiceServerUpdateEvent); } - console.log( - `[Gateway] VOICE_STATE_UPDATE for user ${this.user_id} in channel ${voiceState.channel_id} in guild ${voiceState.guild_id} in ${Date.now() - startTime}ms`, - ); + console.log(`[Gateway] VOICE_STATE_UPDATE for user ${this.user_id} in channel ${voiceState.channel_id} in guild ${voiceState.guild_id} in ${Date.now() - startTime}ms`); } diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 3a270e68d..a20531d9e 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -21,13 +21,7 @@ import { closeDatabase, Config, initDatabase, initEvent } from "@spacebar/util"; import http from "http"; import ws from "ws"; import { Connection } from "./events/Connection"; -import { - loadWebRtcLibrary, - mediaServer, - WRTC_PORT_MAX, - WRTC_PORT_MIN, - WRTC_PUBLIC_IP, -} from "./util/MediaServer"; +import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util/MediaServer"; import { green, yellow } from "picocolors"; export class Server { @@ -36,15 +30,7 @@ export class Server { public server: http.Server; public production: boolean; - constructor({ - port, - server, - production, - }: { - port: number; - server?: http.Server; - production?: boolean; - }) { + constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) { this.port = port; this.production = production || false; diff --git a/src/webrtc/events/Connection.ts b/src/webrtc/events/Connection.ts index a068a8fd8..6e028c647 100644 --- a/src/webrtc/events/Connection.ts +++ b/src/webrtc/events/Connection.ts @@ -28,11 +28,7 @@ import { onMessage } from "./Message"; // TODO: specify rate limit in config // TODO: check msg max size -export async function Connection( - this: WS.Server, - socket: WebRtcWebSocket, - request: IncomingMessage, -) { +export async function Connection(this: WS.Server, socket: WebRtcWebSocket, request: IncomingMessage) { try { socket.on("close", onClose.bind(socket)); socket.on("message", onMessage.bind(socket)); @@ -57,8 +53,7 @@ export async function Connection( socket.encoding = "json"; socket.version = Number(searchParams.get("v")) || 5; - if (socket.version < 3) - return socket.close(CLOSECODES.Unknown_error, "invalid version"); + if (socket.version < 3) return socket.close(CLOSECODES.Unknown_error, "invalid version"); setHeartbeat(socket); diff --git a/src/webrtc/events/Message.ts b/src/webrtc/events/Message.ts index ccb88b72c..932510798 100644 --- a/src/webrtc/events/Message.ts +++ b/src/webrtc/events/Message.ts @@ -31,8 +31,7 @@ const PayloadSchema = { export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) { try { const data: VoicePayload = JSON.parse(buffer.toString()); - if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) - return this.close(CLOSECODES.Not_authenticated); + if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated); const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { @@ -42,11 +41,7 @@ export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) { return; } - if ( - ![VoiceOPCodes.HEARTBEAT, VoiceOPCodes.SPEAKING].includes( - data.op as VoiceOPCodes, - ) - ) { + if (![VoiceOPCodes.HEARTBEAT, VoiceOPCodes.SPEAKING].includes(data.op as VoiceOPCodes)) { console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]); } diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index 3abb26d62..8a2516c42 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -17,29 +17,15 @@ */ import { CLOSECODES } from "@spacebar/gateway"; -import { - StreamSession, - VoiceState, -} from "@spacebar/util"; -import { - validateSchema, - VoiceIdentifySchema, -} from "@spacebar/schemas"; -import { - generateSsrc, - mediaServer, - Send, - VoiceOPCodes, - VoicePayload, - WebRtcWebSocket, -} from "@spacebar/webrtc"; +import { StreamSession, VoiceState } from "@spacebar/util"; +import { validateSchema, VoiceIdentifySchema } from "@spacebar/schemas"; +import { generateSsrc, mediaServer, Send, VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "@spacebar/webrtc"; import { SSRCs } from "@spacebarchat/spacebar-webrtc-types"; import { subscribeToProducers } from "./Video"; export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { clearTimeout(this.readyTimeout); - const { server_id, user_id, session_id, token, streams, video } = - validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema; + const { server_id, user_id, session_id, token, streams, video } = validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema; // server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel // not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db @@ -92,12 +78,7 @@ export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { this.type = type; const voiceRoomId = type === "stream" ? server_id : voiceState!.channel_id; - this.webRtcClient = await mediaServer.join( - voiceRoomId, - this.user_id, - this, - type!, - ); + this.webRtcClient = await mediaServer.join(voiceRoomId, this.user_id, this, type!); this.on("close", () => { // ice-lite media server relies on this to know when the peer went away diff --git a/src/webrtc/opcodes/SelectProtocol.ts b/src/webrtc/opcodes/SelectProtocol.ts index 2aa841f20..a29bcaccd 100644 --- a/src/webrtc/opcodes/SelectProtocol.ts +++ b/src/webrtc/opcodes/SelectProtocol.ts @@ -16,34 +16,17 @@ along with this program. If not, see . */ import { SelectProtocolSchema, validateSchema } from "@spacebar/schemas"; -import { - VoiceOPCodes, - VoicePayload, - WebRtcWebSocket, - mediaServer, - Send, -} from "@spacebar/webrtc"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, mediaServer, Send } from "@spacebar/webrtc"; -export async function onSelectProtocol( - this: WebRtcWebSocket, - payload: VoicePayload, -) { +export async function onSelectProtocol(this: WebRtcWebSocket, payload: VoicePayload) { if (!this.webRtcClient) return; - const data = validateSchema( - "SelectProtocolSchema", - payload.d, - ) as SelectProtocolSchema; + const data = validateSchema("SelectProtocolSchema", payload.d) as SelectProtocolSchema; // UDP protocol not currently supported. Maybe in the future? - if (data.protocol !== "webrtc") - return this.close(4000, "only webrtc protocol supported currently"); + if (data.protocol !== "webrtc") return this.close(4000, "only webrtc protocol supported currently"); - const response = await mediaServer.onOffer( - this.webRtcClient, - data.sdp!, - data.codecs ?? [], - ); + const response = await mediaServer.onOffer(this.webRtcClient, data.sdp!, data.codecs ?? []); await Send(this, { op: VoiceOPCodes.SESSION_DESCRIPTION, diff --git a/src/webrtc/opcodes/Speaking.ts b/src/webrtc/opcodes/Speaking.ts index bff0db979..fde63f342 100644 --- a/src/webrtc/opcodes/Speaking.ts +++ b/src/webrtc/opcodes/Speaking.ts @@ -16,13 +16,7 @@ along with this program. If not, see . */ -import { - mediaServer, - VoiceOPCodes, - VoicePayload, - WebRtcWebSocket, - Send, -} from "../util"; +import { mediaServer, VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util"; // {"speaking":1,"delay":5,"ssrc":2805246727} @@ -30,11 +24,7 @@ export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { if (!this.webRtcClient) return; await Promise.all( - Array.from( - mediaServer.getClientsForRtcServer( - this.webRtcClient.voiceRoomId, - ), - ).map((client) => { + Array.from(mediaServer.getClientsForRtcServer(this.webRtcClient.voiceRoomId)).map((client) => { if (client.user_id === this.user_id) return Promise.resolve(); const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id); diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 1309fc4fe..ed8870e44 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -16,13 +16,7 @@ along with this program. If not, see . */ import { Stream } from "@spacebar/util"; -import { - mediaServer, - Send, - VoiceOPCodes, - VoicePayload, - WebRtcWebSocket, -} from "@spacebar/webrtc"; +import { mediaServer, Send, VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "@spacebar/webrtc"; import type { WebRtcClient } from "@spacebarchat/spacebar-webrtc-types"; import { validateSchema, VoiceVideoSchema } from "@spacebar/schemas"; @@ -60,9 +54,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { try { await Promise.race([ new Promise((resolve, reject) => { - this.webRtcClient?.emitter.once("connected", () => - resolve(), - ); + this.webRtcClient?.emitter.once("connected", () => resolve()); }), new Promise((resolve, reject) => { // Reject after 3 seconds if still not connected @@ -93,28 +85,19 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { if (wantsToProduceAudio) { // check if we are already producing audio, if not, publish a new audio track for it if (!this.webRtcClient!.isProducingAudio()) { - console.log( - `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, - ); + console.log(`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`); await this.webRtcClient.publishTrack("audio", { audio_ssrc: d.audio_ssrc, }); } // now check that all clients have subscribed to our audio - for (const client of mediaServer.getClientsForRtcServer( - voiceRoomId, - )) { + for (const client of mediaServer.getClientsForRtcServer(voiceRoomId)) { if (client.user_id === this.user_id) continue; if (!client.isSubscribedToTrack(this.user_id, "audio")) { - console.log( - `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, - ); - await client.subscribeToTrack( - this.webRtcClient.user_id, - "audio", - ); + console.log(`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`); + await client.subscribeToTrack(this.webRtcClient.user_id, "audio"); clientsThatNeedUpdate.add(client); } @@ -125,9 +108,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response // check if we are already publishing video, if not, publish a new video track for it if (!this.webRtcClient!.isProducingVideo()) { - console.log( - `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, - ); + console.log(`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`); await this.webRtcClient.publishTrack("video", { video_ssrc: d.video_ssrc, rtx_ssrc: d.rtx_ssrc, @@ -135,19 +116,12 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } // now check that all clients have subscribed to our video track - for (const client of mediaServer.getClientsForRtcServer( - voiceRoomId, - )) { + for (const client of mediaServer.getClientsForRtcServer(voiceRoomId)) { if (client.user_id === this.user_id) continue; if (!client.isSubscribedToTrack(this.user_id, "video")) { - console.log( - `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, - ); - await client.subscribeToTrack( - this.webRtcClient.user_id, - "video", - ); + console.log(`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`); + await client.subscribeToTrack(this.webRtcClient.user_id, "video"); clientsThatNeedUpdate.add(client); } @@ -163,9 +137,7 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { d: { user_id: this.user_id, // can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup - audio_ssrc: - ssrcs.audio_ssrc ?? - this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc, + audio_ssrc: ssrcs.audio_ssrc ?? this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc, video_ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, streams: d.streams?.map((x) => ({ @@ -181,14 +153,10 @@ export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { } // check if we are not subscribed to producers in this server, if not, subscribe -export async function subscribeToProducers( - this: WebRtcWebSocket, -): Promise { +export async function subscribeToProducers(this: WebRtcWebSocket): Promise { if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return; - const clients = mediaServer.getClientsForRtcServer( - this.webRtcClient.voiceRoomId, - ); + const clients = mediaServer.getClientsForRtcServer(this.webRtcClient.voiceRoomId); await Promise.all( Array.from(clients).map(async (client) => { @@ -196,42 +164,26 @@ export async function subscribeToProducers( if (client.user_id === this.user_id) return; // cannot subscribe to self - if ( - client.isProducingAudio() && - !this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio") - ) { - await this.webRtcClient!.subscribeToTrack( - client.user_id, - "audio", - ); + if (client.isProducingAudio() && !this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio")) { + await this.webRtcClient!.subscribeToTrack(client.user_id, "audio"); needsUpdate = true; } - if ( - client.isProducingVideo() && - !this.webRtcClient!.isSubscribedToTrack(client.user_id, "video") - ) { - await this.webRtcClient!.subscribeToTrack( - client.user_id, - "video", - ); + if (client.isProducingVideo() && !this.webRtcClient!.isSubscribedToTrack(client.user_id, "video")) { + await this.webRtcClient!.subscribeToTrack(client.user_id, "video"); needsUpdate = true; } if (!needsUpdate) return; - const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser( - client.user_id, - ); + const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser(client.user_id); await Send(this, { op: VoiceOPCodes.VIDEO, d: { user_id: client.user_id, // can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup - audio_ssrc: - ssrcs.audio_ssrc ?? - client.getIncomingStreamSSRCs().audio_ssrc, + audio_ssrc: ssrcs.audio_ssrc ?? client.getIncomingStreamSSRCs().audio_ssrc, video_ssrc: ssrcs.video_ssrc ?? 0, rtx_ssrc: ssrcs.rtx_ssrc ?? 0, streams: [