/* SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ import { readUsers } from "~/server/database"; import type { ApiAccount, ApiEvent } from "~/shared/types/api"; function sendMessage( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => writer.write(message)) .catch(console.error) .finally(() => writer.releaseLock()) ; } function sendMessageAndClose( stream: WritableStream, message: string, ) { const writer = stream.getWriter(); writer.ready .then(() => { writer.write(message); writer.close(); }).catch(console.error) .finally(() => writer.releaseLock()) ; } const streams = new Map, { sessionId?: number, accountId?: number }>(); let keepaliveInterval: ReturnType | null = null export function addStream(stream: WritableStream, sessionId?: number, accountId?: number) { if (streams.size === 0) { console.log("Starting keepalive") keepaliveInterval = setInterval(sendKeepalive, 4000) } streams.set(stream, { sessionId, accountId }); // Produce a response immediately to avoid the reply waiting for content. const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`; sendMessage(stream, message); } export function deleteStream(stream: WritableStream) { streams.delete(stream); if (streams.size === 0) { console.log("Ending keepalive") clearInterval(keepaliveInterval!); } } export function cancelAccountStreams(accountId: number) { for (const [stream, data] of streams) { if (data.accountId === accountId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } export function cancelSessionStreams(sessionId: number) { for (const [stream, data] of streams) { if (data.sessionId === sessionId) { sendMessageAndClose(stream, `data: cancelled\n\n`); } } } const encodeEventCache = new WeakMap>(); function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) { const cache = encodeEventCache.get(event); const cacheEntry = cache?.get(userType); if (cacheEntry) { return cacheEntry; } let data: string; if (event.type === "schedule-update") { if (!canSeeCrew(userType)) { event = { type: event.type, updatedFrom: event.updatedFrom, data: filterSchedule(event.data), }; } data = JSON.stringify(event); } else if (event.type === "user-update") { if ( !canSeeCrew(userType) || !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType) ) { event = { type: event.type, data: { id: event.data.id, updatedAt: event.data.updatedAt, deleted: true, } } } data = JSON.stringify(event); } else { throw Error(`encodeEvent cannot encode ${event.type} event`); } if (cache) { cache.set(userType, data); } else { encodeEventCache.set(event, new Map([[userType, data]])); } return data; } export async function broadcastEvent(event: ApiEvent) { const id = Date.now(); console.log(`broadcasting update to ${streams.size} clients`); if (!streams.size) { return; } const users = await readUsers(); for (const [stream, streamData] of streams) { // Account events are specially handled and only sent to the user they belong to. if (event.type === "account-update") { if (streamData.accountId === event.data.id) { sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`); } } else { let userType: ApiAccount["type"] | undefined; if (streamData.accountId !== undefined) { userType = users.find(a => a.id === streamData.accountId)?.type } const data = encodeEvent(event, userType) sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`); } } } function sendKeepalive() { for (const stream of streams.keys()) { sendMessage(stream, ": keepalive\n"); } }