import { readAccounts } from "~/server/database"; import { canSeeCrew } from "./utils/schedule"; import type { ApiAccount, ApiEvent } from "~/shared/types/api"; function sendMessage( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => writer.write(message)) .catch(console.error) .finally(() => writer.releaseLock()) ; } function sendMessageAndClose( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => { writer.write(message); writer.close(); }).catch(console.error) .finally(() => writer.releaseLock()) ; } const streams = new Map, { sessionId?: number, accountId?: number }>(); let keepaliveInterval: ReturnType | null = null export function addStream(stream: WritableStream, sessionId?: number, accountId?: number) { if (streams.size === 0) { console.log("Starting keepalive") keepaliveInterval = setInterval(sendKeepalive, 4000) } streams.set(stream, { sessionId, accountId }); // Produce a response immediately to avoid the reply waiting for content. const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`; sendMessage(stream, message); } export function deleteStream(stream: WritableStream) { streams.delete(stream); if (streams.size === 0) { console.log("Ending keepalive") clearInterval(keepaliveInterval!); } } export function cancelAccountStreams(accountId: number) { for (const [stream, data] of streams) { if (data.accountId === accountId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } export function cancelSessionStreams(sessionId: number) { for (const [stream, data] of streams) { if (data.sessionId === sessionId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } const encodeEventCache = new WeakMap>(); function encodeEvent(event: ApiEvent, accountType: ApiAccount["type"] | undefined) { const cache = encodeEventCache.get(event); const cacheEntry = cache?.get(accountType); if (cacheEntry) { return cacheEntry; } let data: string; if (event.type === "schedule-update") { if (!canSeeCrew(accountType)) { event = { type: event.type, updatedFrom: event.updatedFrom, data: filterSchedule(event.data), }; } data = JSON.stringify(event); } else { throw Error(`encodeEvent cannot encode ${event.type} event`); } if (cache) { cache.set(accountType, data); } else { encodeEventCache.set(event, new Map([[accountType, data]])); } return data; } export async function broadcastEvent(event: ApiEvent) { const id = Date.now(); console.log(`broadcasting update to ${streams.size} clients`); if (!streams.size) { return; } const accounts = await readAccounts(); for (const [stream, streamData] of streams) { // Account events are specially handled and only sent to the account they belong to. if (event.type === "account-update") { if (streamData.accountId === event.data.id) { sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`); } } else { let accountType: ApiAccount["type"] | undefined; if (streamData.accountId !== undefined) { accountType = accounts.find(a => a.id === streamData.accountId)?.type } const data = encodeEvent(event, accountType) sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`); } } } function sendKeepalive() { for (const stream of streams.keys()) { sendMessage(stream, ": keepalive\n"); } }