import { Schedule } from "~/shared/types/schedule" import { readAccounts } from "~/server/database"; import { canSeeCrew } from "./utils/schedule"; function sendMessage( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => writer.write(message)) .catch(console.error) .finally(() => writer.releaseLock()) ; } function sendMessageAndClose( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => { writer.write(message); writer.close(); }).catch(console.error) .finally(() => writer.releaseLock()) ; } const streams = new Map, { sessionId?: number, accountId?: number }>(); let keepaliveInterval: ReturnType | null = null export function addStream(stream: WritableStream, sessionId?: number, accountId?: number) { if (streams.size === 0) { console.log("Starting keepalive") keepaliveInterval = setInterval(sendKeepalive, 4000) } streams.set(stream, { sessionId, accountId }); // Produce a response immediately to avoid the reply waiting for content. const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`; sendMessage(stream, message); } export function deleteStream(stream: WritableStream) { streams.delete(stream); if (streams.size === 0) { console.log("Ending keepalive") clearInterval(keepaliveInterval!); } } export function cancelAccountStreams(accountId: number) { for (const [stream, data] of streams) { if (data.accountId === accountId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } export function cancelSessionStreams(sessionId: number) { for (const [stream, data] of streams) { if (data.sessionId === sessionId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } export async function broadcastUpdate(schedule: Schedule) { const id = Date.now(); console.log(`broadcasting update to ${streams.size} clients`); if (!streams.size) { return; } const accounts = await readAccounts(); const filteredSchedule = filterSchedule(schedule); for (const [stream, streamData] of streams) { let accountType: string | undefined; if (streamData.accountId !== undefined) { accountType = accounts.find(a => a.id === streamData.accountId)?.type } const data = JSON.stringify(canSeeCrew(accountType) ? schedule : filteredSchedule); const message = `id: ${id}\nevent: update\ndata: ${data}\n\n` sendMessage(stream, message); } } function sendKeepalive() { for (const stream of streams.keys()) { sendMessage(stream, ": keepalive\n"); } }